diff --git a/docs/mineru_integration_en.md b/docs/mineru_integration_en.md index b75527f5..472c1193 100644 --- a/docs/mineru_integration_en.md +++ b/docs/mineru_integration_en.md @@ -257,7 +257,7 @@ The processors support different types of content: - `ImageModalProcessor`: Processes images with captions and footnotes - `TableModalProcessor`: Processes tables with captions and footnotes - `EquationModalProcessor`: Processes mathematical equations in LaTeX format -- `GenericModalProcessor`: A base processor that can be extended for custom content types +- `GenericModalProcessor`: A base processor that can be extended for custom content types > **Note**: A complete working example can be found in `examples/modalprocessors_example.py`. You can run it using: > ```bash @@ -357,4 +357,4 @@ description, entity_info = await equation_processor.process_multimodal_content( ) ``` - \ No newline at end of file + diff --git a/docs/mineru_integration_zh.md b/docs/mineru_integration_zh.md index a168c122..0bfb988a 100644 --- a/docs/mineru_integration_zh.md +++ b/docs/mineru_integration_zh.md @@ -256,7 +256,7 @@ MinerU 配置文件 `magic-pdf.json` 支持多种自定义选项,包括: - `ImageModalProcessor`:处理带有标题和脚注的图像 - `TableModalProcessor`:处理带有标题和脚注的表格 - `EquationModalProcessor`:处理 LaTeX 格式的数学公式 -- `GenericModalProcessor`:可用于扩展自定义内容类型的基础处理器 +- `GenericModalProcessor`:可用于扩展自定义内容类型的基础处理器 > **注意**:完整的可运行示例可以在 `examples/modalprocessors_example.py` 中找到。您可以使用以下命令运行它: > ```bash @@ -355,4 +355,4 @@ description, entity_info = await equation_processor.process_multimodal_content( entity_name="质能方程" ) ``` - \ No newline at end of file + diff --git a/examples/mineru_example.py b/examples/mineru_example.py index 7d6dc87c..79dc070c 100644 --- a/examples/mineru_example.py +++ b/examples/mineru_example.py @@ -10,13 +10,15 @@ This example shows how to: import os import argparse -from pathlib import Path from lightrag.mineru_parser import MineruParser -def parse_document(file_path: str, output_dir: str = None, method: str = "auto", stats: bool = False): + +def parse_document( + file_path: str, output_dir: str = None, method: str = "auto", stats: bool = False +): """ Parse a document using MinerU parser - + Args: file_path: Path to the document output_dir: Output directory for parsed results @@ -26,22 +28,20 @@ def parse_document(file_path: str, output_dir: str = None, method: str = "auto", try: # Parse the document content_list, md_content = MineruParser.parse_document( - file_path=file_path, - parse_method=method, - output_dir=output_dir + file_path=file_path, parse_method=method, output_dir=output_dir ) # Display statistics if requested if stats: print("\nDocument Statistics:") print(f"Total content blocks: {len(content_list)}") - + # Count different types of content content_types = {} for item in content_list: - content_type = item.get('type', 'unknown') + content_type = item.get("type", "unknown") content_types[content_type] = content_types.get(content_type, 0) + 1 - + print("\nContent Type Distribution:") for content_type, count in content_types.items(): print(f"- {content_type}: {count}") @@ -52,17 +52,22 @@ def parse_document(file_path: str, output_dir: str = None, method: str = "auto", print(f"Error parsing document: {str(e)}") return None, None + def main(): """Main function to run the example""" - parser = argparse.ArgumentParser(description='MinerU Parser Example') - parser.add_argument('file_path', help='Path to the document to parse') - parser.add_argument('--output', '-o', help='Output directory path') - parser.add_argument('--method', '-m', - choices=['auto', 'ocr', 'txt'], - default='auto', - help='Parsing method (auto, ocr, txt)') - parser.add_argument('--stats', action='store_true', - help='Display content statistics') + parser = argparse.ArgumentParser(description="MinerU Parser Example") + parser.add_argument("file_path", help="Path to the document to parse") + parser.add_argument("--output", "-o", help="Output directory path") + parser.add_argument( + "--method", + "-m", + choices=["auto", "ocr", "txt"], + default="auto", + help="Parsing method (auto, ocr, txt)", + ) + parser.add_argument( + "--stats", action="store_true", help="Display content statistics" + ) args = parser.parse_args() @@ -72,11 +77,9 @@ def main(): # Parse document content_list, md_content = parse_document( - args.file_path, - args.output, - args.method, - args.stats + args.file_path, args.output, args.method, args.stats ) -if __name__ == '__main__': - main() \ No newline at end of file + +if __name__ == "__main__": + main() diff --git a/examples/modalprocessors_example.py b/examples/modalprocessors_example.py index f25530fb..ebd258e9 100644 --- a/examples/modalprocessors_example.py +++ b/examples/modalprocessors_example.py @@ -8,94 +8,112 @@ import asyncio import argparse from lightrag.llm.openai import openai_complete_if_cache, openai_embed from lightrag.kg.shared_storage import initialize_pipeline_status -from pathlib import Path from lightrag import LightRAG from lightrag.modalprocessors import ( ImageModalProcessor, TableModalProcessor, EquationModalProcessor, - GenericModalProcessor ) WORKING_DIR = "./rag_storage" -def get_llm_model_func(api_key: str, base_url: str = None): - return lambda prompt, system_prompt=None, history_messages=[], **kwargs: openai_complete_if_cache( - "gpt-4o-mini", - prompt, - system_prompt=system_prompt, - history_messages=history_messages, - api_key=api_key, - base_url=base_url, - **kwargs, - ) -def get_vision_model_func(api_key: str, base_url: str = None): - return lambda prompt, system_prompt=None, history_messages=[], image_data=None, **kwargs: openai_complete_if_cache( - "gpt-4o", - "", +def get_llm_model_func(api_key: str, base_url: str = None): + return ( + lambda prompt, system_prompt=None, history_messages=[], - messages=[ - {"role": "system", "content": system_prompt} if system_prompt else None, - {"role": "user", "content": [ - {"type": "text", "text": prompt}, - { - "type": "image_url", - "image_url": { - "url": f"data:image/jpeg;base64,{image_data}" - } - } - ]} if image_data else {"role": "user", "content": prompt} - ], - api_key=api_key, - base_url=base_url, - **kwargs, - ) if image_data else openai_complete_if_cache( - "gpt-4o-mini", - prompt, - system_prompt=system_prompt, - history_messages=history_messages, - api_key=api_key, - base_url=base_url, - **kwargs, + **kwargs: openai_complete_if_cache( + "gpt-4o-mini", + prompt, + system_prompt=system_prompt, + history_messages=history_messages, + api_key=api_key, + base_url=base_url, + **kwargs, + ) ) + +def get_vision_model_func(api_key: str, base_url: str = None): + return ( + lambda prompt, + system_prompt=None, + history_messages=[], + image_data=None, + **kwargs: openai_complete_if_cache( + "gpt-4o", + "", + system_prompt=None, + history_messages=[], + messages=[ + {"role": "system", "content": system_prompt} if system_prompt else None, + { + "role": "user", + "content": [ + {"type": "text", "text": prompt}, + { + "type": "image_url", + "image_url": { + "url": f"data:image/jpeg;base64,{image_data}" + }, + }, + ], + } + if image_data + else {"role": "user", "content": prompt}, + ], + api_key=api_key, + base_url=base_url, + **kwargs, + ) + if image_data + else openai_complete_if_cache( + "gpt-4o-mini", + prompt, + system_prompt=system_prompt, + history_messages=history_messages, + api_key=api_key, + base_url=base_url, + **kwargs, + ) + ) + + async def process_image_example(lightrag: LightRAG, vision_model_func): """Example of processing an image""" # Create image processor image_processor = ImageModalProcessor( - lightrag=lightrag, - modal_caption_func=vision_model_func + lightrag=lightrag, modal_caption_func=vision_model_func ) - + # Prepare image content image_content = { "img_path": "image.jpg", "img_caption": ["Example image caption"], - "img_footnote": ["Example image footnote"] + "img_footnote": ["Example image footnote"], } - + # Process image description, entity_info = await image_processor.process_multimodal_content( modal_content=image_content, content_type="image", file_path="image_example.jpg", - entity_name="Example Image" + entity_name="Example Image", ) - + print("Image Processing Results:") print(f"Description: {description}") print(f"Entity Info: {entity_info}") + async def process_table_example(lightrag: LightRAG, llm_model_func): """Example of processing a table""" # Create table processor table_processor = TableModalProcessor( - lightrag=lightrag, - modal_caption_func=llm_model_func + lightrag=lightrag, modal_caption_func=llm_model_func ) - + # Prepare table content table_content = { "table_body": """ @@ -105,47 +123,45 @@ async def process_table_example(lightrag: LightRAG, llm_model_func): | Mary | 30 | Designer | """, "table_caption": ["Employee Information Table"], - "table_footnote": ["Data updated as of 2024"] + "table_footnote": ["Data updated as of 2024"], } - + # Process table description, entity_info = await table_processor.process_multimodal_content( modal_content=table_content, content_type="table", file_path="table_example.md", - entity_name="Employee Table" + entity_name="Employee Table", ) - + print("\nTable Processing Results:") print(f"Description: {description}") print(f"Entity Info: {entity_info}") + async def process_equation_example(lightrag: LightRAG, llm_model_func): """Example of processing a mathematical equation""" # Create equation processor equation_processor = EquationModalProcessor( - lightrag=lightrag, - modal_caption_func=llm_model_func + lightrag=lightrag, modal_caption_func=llm_model_func ) - + # Prepare equation content - equation_content = { - "text": "E = mc^2", - "text_format": "LaTeX" - } - + equation_content = {"text": "E = mc^2", "text_format": "LaTeX"} + # Process equation description, entity_info = await equation_processor.process_multimodal_content( modal_content=equation_content, content_type="equation", file_path="equation_example.txt", - entity_name="Mass-Energy Equivalence" + entity_name="Mass-Energy Equivalence", ) - + print("\nEquation Processing Results:") print(f"Description: {description}") print(f"Entity Info: {entity_info}") + async def initialize_rag(api_key: str, base_url: str = None): rag = LightRAG( working_dir=WORKING_DIR, @@ -155,7 +171,10 @@ async def initialize_rag(api_key: str, base_url: str = None): api_key=api_key, base_url=base_url, ), - llm_model_func=lambda prompt, system_prompt=None, history_messages=[], **kwargs: openai_complete_if_cache( + llm_model_func=lambda prompt, + system_prompt=None, + history_messages=[], + **kwargs: openai_complete_if_cache( "gpt-4o-mini", prompt, system_prompt=system_prompt, @@ -171,30 +190,35 @@ async def initialize_rag(api_key: str, base_url: str = None): return rag + def main(): """Main function to run the example""" - parser = argparse.ArgumentParser(description='Modal Processors Example') - parser.add_argument('--api-key', required=True, help='OpenAI API key') - parser.add_argument('--base-url', help='Optional base URL for API') - parser.add_argument('--working-dir', '-w', default=WORKING_DIR, help='Working directory path') + parser = argparse.ArgumentParser(description="Modal Processors Example") + parser.add_argument("--api-key", required=True, help="OpenAI API key") + parser.add_argument("--base-url", help="Optional base URL for API") + parser.add_argument( + "--working-dir", "-w", default=WORKING_DIR, help="Working directory path" + ) args = parser.parse_args() # Run examples asyncio.run(main_async(args.api_key, args.base_url)) + async def main_async(api_key: str, base_url: str = None): # Initialize LightRAG lightrag = await initialize_rag(api_key, base_url) - + # Get model functions llm_model_func = get_llm_model_func(api_key, base_url) vision_model_func = get_vision_model_func(api_key, base_url) - + # Run examples await process_image_example(lightrag, vision_model_func) await process_table_example(lightrag, llm_model_func) await process_equation_example(lightrag, llm_model_func) + if __name__ == "__main__": - main() \ No newline at end of file + main() diff --git a/examples/raganything_example.py b/examples/raganything_example.py index 8f56c81d..63e6e7a3 100644 --- a/examples/raganything_example.py +++ b/examples/raganything_example.py @@ -11,15 +11,20 @@ This example shows how to: import os import argparse import asyncio -from pathlib import Path -from lightrag.mineru_parser import MineruParser from lightrag.llm.openai import openai_complete_if_cache, openai_embed from lightrag.raganything import RAGAnything -async def process_with_rag(file_path: str, output_dir: str, api_key: str, base_url: str = None, working_dir: str = None): + +async def process_with_rag( + file_path: str, + output_dir: str, + api_key: str, + base_url: str = None, + working_dir: str = None, +): """ Process document with RAGAnything - + Args: file_path: Path to the document output_dir: Output directory for RAG results @@ -30,7 +35,10 @@ async def process_with_rag(file_path: str, output_dir: str, api_key: str, base_u # Initialize RAGAnything rag = RAGAnything( working_dir=working_dir, - llm_model_func=lambda prompt, system_prompt=None, history_messages=[], **kwargs: openai_complete_if_cache( + llm_model_func=lambda prompt, + system_prompt=None, + history_messages=[], + **kwargs: openai_complete_if_cache( "gpt-4o-mini", prompt, system_prompt=system_prompt, @@ -39,27 +47,40 @@ async def process_with_rag(file_path: str, output_dir: str, api_key: str, base_u base_url=base_url, **kwargs, ), - vision_model_func=lambda prompt, system_prompt=None, history_messages=[], image_data=None, **kwargs: openai_complete_if_cache( + vision_model_func=lambda prompt, + system_prompt=None, + history_messages=[], + image_data=None, + **kwargs: openai_complete_if_cache( "gpt-4o", "", system_prompt=None, history_messages=[], messages=[ - {"role": "system", "content": system_prompt} if system_prompt else None, - {"role": "user", "content": [ - {"type": "text", "text": prompt}, - { - "type": "image_url", - "image_url": { - "url": f"data:image/jpeg;base64,{image_data}" - } - } - ]} if image_data else {"role": "user", "content": prompt} + {"role": "system", "content": system_prompt} + if system_prompt + else None, + { + "role": "user", + "content": [ + {"type": "text", "text": prompt}, + { + "type": "image_url", + "image_url": { + "url": f"data:image/jpeg;base64,{image_data}" + }, + }, + ], + } + if image_data + else {"role": "user", "content": prompt}, ], api_key=api_key, base_url=base_url, **kwargs, - ) if image_data else openai_complete_if_cache( + ) + if image_data + else openai_complete_if_cache( "gpt-4o-mini", prompt, system_prompt=system_prompt, @@ -75,21 +96,19 @@ async def process_with_rag(file_path: str, output_dir: str, api_key: str, base_u base_url=base_url, ), embedding_dim=3072, - max_token_size=8192 + max_token_size=8192, ) # Process document await rag.process_document_complete( - file_path=file_path, - output_dir=output_dir, - parse_method="auto" + file_path=file_path, output_dir=output_dir, parse_method="auto" ) # Example queries queries = [ "What is the main content of the document?", "Describe the images and figures in the document", - "Tell me about the experimental results and data tables" + "Tell me about the experimental results and data tables", ] print("\nQuerying processed document:") @@ -101,14 +120,21 @@ async def process_with_rag(file_path: str, output_dir: str, api_key: str, base_u except Exception as e: print(f"Error processing with RAG: {str(e)}") + def main(): """Main function to run the example""" - parser = argparse.ArgumentParser(description='MinerU RAG Example') - parser.add_argument('file_path', help='Path to the document to process') - parser.add_argument('--working_dir', '-w', default="./rag_storage", help='Working directory path') - parser.add_argument('--output', '-o', default="./output", help='Output directory path') - parser.add_argument('--api-key', required=True, help='OpenAI API key for RAG processing') - parser.add_argument('--base-url', help='Optional base URL for API') + parser = argparse.ArgumentParser(description="MinerU RAG Example") + parser.add_argument("file_path", help="Path to the document to process") + parser.add_argument( + "--working_dir", "-w", default="./rag_storage", help="Working directory path" + ) + parser.add_argument( + "--output", "-o", default="./output", help="Output directory path" + ) + parser.add_argument( + "--api-key", required=True, help="OpenAI API key for RAG processing" + ) + parser.add_argument("--base-url", help="Optional base URL for API") args = parser.parse_args() @@ -117,13 +143,12 @@ def main(): os.makedirs(args.output, exist_ok=True) # Process with RAG - asyncio.run(process_with_rag( - args.file_path, - args.output, - args.api_key, - args.base_url, - args.working_dir - )) + asyncio.run( + process_with_rag( + args.file_path, args.output, args.api_key, args.base_url, args.working_dir + ) + ) -if __name__ == '__main__': - main() \ No newline at end of file + +if __name__ == "__main__": + main() diff --git a/lightrag/mineru_parser.py b/lightrag/mineru_parser.py index 52bc4962..1ce095b7 100644 --- a/lightrag/mineru_parser.py +++ b/lightrag/mineru_parser.py @@ -1,4 +1,4 @@ -# type: ignore +# type: ignore """ MinerU Document Parser Utility @@ -14,7 +14,18 @@ import os import json import argparse from pathlib import Path -from typing import Dict, List, Optional, Union, Tuple, Any, TypeVar, cast, TYPE_CHECKING, ClassVar +from typing import ( + Dict, + List, + Optional, + Union, + Tuple, + Any, + TypeVar, + cast, + TYPE_CHECKING, + ClassVar, +) # Type stubs for magic_pdf FileBasedDataWriter = Any @@ -28,20 +39,27 @@ read_local_office = Any read_local_images = Any if TYPE_CHECKING: - from magic_pdf.data.data_reader_writer import FileBasedDataWriter, FileBasedDataReader + from magic_pdf.data.data_reader_writer import ( + FileBasedDataWriter, + FileBasedDataReader, + ) from magic_pdf.data.dataset import PymuDocDataset from magic_pdf.model.doc_analyze_by_custom_model import doc_analyze from magic_pdf.config.enums import SupportedPdfParseMethod from magic_pdf.data.read_api import read_local_office, read_local_images else: # MinerU imports - from magic_pdf.data.data_reader_writer import FileBasedDataWriter, FileBasedDataReader + from magic_pdf.data.data_reader_writer import ( + FileBasedDataWriter, + FileBasedDataReader, + ) from magic_pdf.data.dataset import PymuDocDataset from magic_pdf.model.doc_analyze_by_custom_model import doc_analyze from magic_pdf.config.enums import SupportedPdfParseMethod from magic_pdf.data.read_api import read_local_office, read_local_images -T = TypeVar('T') +T = TypeVar("T") + class MineruParser: """ @@ -58,7 +76,11 @@ class MineruParser: pass @staticmethod - def safe_write(writer: Any, content: Union[str, bytes, Dict[str, Any], List[Any]], filename: str) -> None: + def safe_write( + writer: Any, + content: Union[str, bytes, Dict[str, Any], List[Any]], + filename: str, + ) -> None: """ Safely write content to a file, ensuring the filename is valid @@ -80,15 +102,22 @@ class MineruParser: writer.write(content, filename) except TypeError: # If the writer expects bytes, convert string to bytes - writer.write(content.encode('utf-8'), filename) + writer.write(content.encode("utf-8"), filename) else: # For dict/list content, always encode as JSON string first if isinstance(content, (dict, list)): try: - writer.write(json.dumps(content, ensure_ascii=False, indent=4), filename) + writer.write( + json.dumps(content, ensure_ascii=False, indent=4), filename + ) except TypeError: # If the writer expects bytes, convert JSON string to bytes - writer.write(json.dumps(content, ensure_ascii=False, indent=4).encode('utf-8'), filename) + writer.write( + json.dumps(content, ensure_ascii=False, indent=4).encode( + "utf-8" + ), + filename, + ) else: # Regular content (assumed to be bytes or compatible) writer.write(content, filename) @@ -97,7 +126,7 @@ class MineruParser: def parse_pdf( pdf_path: Union[str, Path], output_dir: Optional[str] = None, - use_ocr: bool = False + use_ocr: bool = False, ) -> Tuple[List[Dict[str, Any]], str]: """ Parse PDF document @@ -150,9 +179,15 @@ class MineruParser: # Draw visualizations try: - infer_result.draw_model(os.path.join(local_md_dir, f"{name_without_suff}_model.pdf")) # type: ignore - pipe_result.draw_layout(os.path.join(local_md_dir, f"{name_without_suff}_layout.pdf")) # type: ignore - pipe_result.draw_span(os.path.join(local_md_dir, f"{name_without_suff}_spans.pdf")) # type: ignore + infer_result.draw_model( + os.path.join(local_md_dir, f"{name_without_suff}_model.pdf") + ) # type: ignore + pipe_result.draw_layout( + os.path.join(local_md_dir, f"{name_without_suff}_layout.pdf") + ) # type: ignore + pipe_result.draw_span( + os.path.join(local_md_dir, f"{name_without_suff}_spans.pdf") + ) # type: ignore except Exception as e: print(f"Warning: Failed to draw visualizations: {str(e)}") @@ -162,7 +197,9 @@ class MineruParser: # Save files using dump methods (consistent with API) pipe_result.dump_md(md_writer, f"{name_without_suff}.md", image_dir) # type: ignore - pipe_result.dump_content_list(md_writer, f"{name_without_suff}_content_list.json", image_dir) # type: ignore + pipe_result.dump_content_list( + md_writer, f"{name_without_suff}_content_list.json", image_dir + ) # type: ignore pipe_result.dump_middle_json(md_writer, f"{name_without_suff}_middle.json") # type: ignore # Save model result - convert JSON string to bytes before writing @@ -171,16 +208,24 @@ class MineruParser: try: # Try to write to a file manually to avoid FileBasedDataWriter issues - model_file_path = os.path.join(local_md_dir, f"{name_without_suff}_model.json") - with open(model_file_path, 'w', encoding='utf-8') as f: + model_file_path = os.path.join( + local_md_dir, f"{name_without_suff}_model.json" + ) + with open(model_file_path, "w", encoding="utf-8") as f: f.write(json_str) except Exception as e: - print(f"Warning: Failed to save model result using file write: {str(e)}") + print( + f"Warning: Failed to save model result using file write: {str(e)}" + ) try: # If direct file write fails, try using the writer with bytes encoding - md_writer.write(json_str.encode('utf-8'), f"{name_without_suff}_model.json") # type: ignore + md_writer.write( + json_str.encode("utf-8"), f"{name_without_suff}_model.json" + ) # type: ignore except Exception as e2: - print(f"Warning: Failed to save model result using writer: {str(e2)}") + print( + f"Warning: Failed to save model result using writer: {str(e2)}" + ) return cast(Tuple[List[Dict[str, Any]], str], (content_list, md_content)) @@ -190,8 +235,7 @@ class MineruParser: @staticmethod def parse_office_doc( - doc_path: Union[str, Path], - output_dir: Optional[str] = None + doc_path: Union[str, Path], output_dir: Optional[str] = None ) -> Tuple[List[Dict[str, Any]], str]: """ Parse office document (Word, PPT, etc.) @@ -231,9 +275,9 @@ class MineruParser: # Apply chain of operations according to API documentation # This follows the pattern shown in MS-Office example in the API docs - ds.apply(doc_analyze, ocr=True)\ - .pipe_txt_mode(image_writer)\ - .dump_md(md_writer, f"{name_without_suff}.md", image_dir) # type: ignore + ds.apply(doc_analyze, ocr=True).pipe_txt_mode(image_writer).dump_md( + md_writer, f"{name_without_suff}.md", image_dir + ) # type: ignore # Re-execute for getting the content data infer_result = ds.apply(doc_analyze, ocr=True) # type: ignore @@ -244,7 +288,9 @@ class MineruParser: content_list = pipe_result.get_content_list(image_dir) # type: ignore # Save additional output files - pipe_result.dump_content_list(md_writer, f"{name_without_suff}_content_list.json", image_dir) # type: ignore + pipe_result.dump_content_list( + md_writer, f"{name_without_suff}_content_list.json", image_dir + ) # type: ignore pipe_result.dump_middle_json(md_writer, f"{name_without_suff}_middle.json") # type: ignore # Save model result - convert JSON string to bytes before writing @@ -253,16 +299,24 @@ class MineruParser: try: # Try to write to a file manually to avoid FileBasedDataWriter issues - model_file_path = os.path.join(local_md_dir, f"{name_without_suff}_model.json") - with open(model_file_path, 'w', encoding='utf-8') as f: + model_file_path = os.path.join( + local_md_dir, f"{name_without_suff}_model.json" + ) + with open(model_file_path, "w", encoding="utf-8") as f: f.write(json_str) except Exception as e: - print(f"Warning: Failed to save model result using file write: {str(e)}") + print( + f"Warning: Failed to save model result using file write: {str(e)}" + ) try: # If direct file write fails, try using the writer with bytes encoding - md_writer.write(json_str.encode('utf-8'), f"{name_without_suff}_model.json") # type: ignore + md_writer.write( + json_str.encode("utf-8"), f"{name_without_suff}_model.json" + ) # type: ignore except Exception as e2: - print(f"Warning: Failed to save model result using writer: {str(e2)}") + print( + f"Warning: Failed to save model result using writer: {str(e2)}" + ) return cast(Tuple[List[Dict[str, Any]], str], (content_list, md_content)) @@ -272,8 +326,7 @@ class MineruParser: @staticmethod def parse_image( - image_path: Union[str, Path], - output_dir: Optional[str] = None + image_path: Union[str, Path], output_dir: Optional[str] = None ) -> Tuple[List[Dict[str, Any]], str]: """ Parse image document @@ -313,9 +366,9 @@ class MineruParser: # Apply chain of operations according to API documentation # This follows the pattern shown in Image example in the API docs - ds.apply(doc_analyze, ocr=True)\ - .pipe_ocr_mode(image_writer)\ - .dump_md(md_writer, f"{name_without_suff}.md", image_dir) # type: ignore + ds.apply(doc_analyze, ocr=True).pipe_ocr_mode(image_writer).dump_md( + md_writer, f"{name_without_suff}.md", image_dir + ) # type: ignore # Re-execute for getting the content data infer_result = ds.apply(doc_analyze, ocr=True) # type: ignore @@ -326,7 +379,9 @@ class MineruParser: content_list = pipe_result.get_content_list(image_dir) # type: ignore # Save additional output files - pipe_result.dump_content_list(md_writer, f"{name_without_suff}_content_list.json", image_dir) # type: ignore + pipe_result.dump_content_list( + md_writer, f"{name_without_suff}_content_list.json", image_dir + ) # type: ignore pipe_result.dump_middle_json(md_writer, f"{name_without_suff}_middle.json") # type: ignore # Save model result - convert JSON string to bytes before writing @@ -335,16 +390,24 @@ class MineruParser: try: # Try to write to a file manually to avoid FileBasedDataWriter issues - model_file_path = os.path.join(local_md_dir, f"{name_without_suff}_model.json") - with open(model_file_path, 'w', encoding='utf-8') as f: + model_file_path = os.path.join( + local_md_dir, f"{name_without_suff}_model.json" + ) + with open(model_file_path, "w", encoding="utf-8") as f: f.write(json_str) except Exception as e: - print(f"Warning: Failed to save model result using file write: {str(e)}") + print( + f"Warning: Failed to save model result using file write: {str(e)}" + ) try: # If direct file write fails, try using the writer with bytes encoding - md_writer.write(json_str.encode('utf-8'), f"{name_without_suff}_model.json") # type: ignore + md_writer.write( + json_str.encode("utf-8"), f"{name_without_suff}_model.json" + ) # type: ignore except Exception as e2: - print(f"Warning: Failed to save model result using writer: {str(e2)}") + print( + f"Warning: Failed to save model result using writer: {str(e2)}" + ) return cast(Tuple[List[Dict[str, Any]], str], (content_list, md_content)) @@ -357,7 +420,7 @@ class MineruParser: file_path: Union[str, Path], parse_method: str = "auto", output_dir: Optional[str] = None, - save_results: bool = True + save_results: bool = True, ) -> Tuple[List[Dict[str, Any]], str]: """ Parse document using MinerU based on file extension @@ -382,64 +445,59 @@ class MineruParser: # Choose appropriate parser based on file type if ext in [".pdf"]: return MineruParser.parse_pdf( - file_path, - output_dir, - use_ocr=(parse_method == "ocr") + file_path, output_dir, use_ocr=(parse_method == "ocr") ) elif ext in [".jpg", ".jpeg", ".png", ".bmp", ".tiff", ".tif"]: - return MineruParser.parse_image( - file_path, - output_dir - ) + return MineruParser.parse_image(file_path, output_dir) elif ext in [".doc", ".docx", ".ppt", ".pptx"]: - return MineruParser.parse_office_doc( - file_path, - output_dir - ) + return MineruParser.parse_office_doc(file_path, output_dir) else: # For unsupported file types, default to PDF parsing - print(f"Warning: Unsupported file extension '{ext}', trying generic PDF parser") - return MineruParser.parse_pdf( - file_path, - output_dir, - use_ocr=(parse_method == "ocr") + print( + f"Warning: Unsupported file extension '{ext}', trying generic PDF parser" ) + return MineruParser.parse_pdf( + file_path, output_dir, use_ocr=(parse_method == "ocr") + ) + def main(): """ Main function to run the MinerU parser from command line """ - parser = argparse.ArgumentParser(description='Parse documents using MinerU') - parser.add_argument('file_path', help='Path to the document to parse') - parser.add_argument('--output', '-o', help='Output directory path') - parser.add_argument('--method', '-m', - choices=['auto', 'ocr', 'txt'], - default='auto', - help='Parsing method (auto, ocr, txt)') - parser.add_argument('--stats', action='store_true', - help='Display content statistics') + parser = argparse.ArgumentParser(description="Parse documents using MinerU") + parser.add_argument("file_path", help="Path to the document to parse") + parser.add_argument("--output", "-o", help="Output directory path") + parser.add_argument( + "--method", + "-m", + choices=["auto", "ocr", "txt"], + default="auto", + help="Parsing method (auto, ocr, txt)", + ) + parser.add_argument( + "--stats", action="store_true", help="Display content statistics" + ) args = parser.parse_args() try: # Parse the document content_list, md_content = MineruParser.parse_document( - file_path=args.file_path, - parse_method=args.method, - output_dir=args.output + file_path=args.file_path, parse_method=args.method, output_dir=args.output ) # Display statistics if requested if args.stats: print("\nDocument Statistics:") print(f"Total content blocks: {len(content_list)}") - + # Count different types of content content_types = {} for item in content_list: - content_type = item.get('type', 'unknown') + content_type = item.get("type", "unknown") content_types[content_type] = content_types.get(content_type, 0) + 1 - + print("\nContent Type Distribution:") for content_type, count in content_types.items(): print(f"- {content_type}: {count}") @@ -450,5 +508,6 @@ def main(): return 0 -if __name__ == '__main__': + +if __name__ == "__main__": exit(main()) diff --git a/lightrag/modalprocessors.py b/lightrag/modalprocessors.py index e4cb0a37..1ac31ca1 100644 --- a/lightrag/modalprocessors.py +++ b/lightrag/modalprocessors.py @@ -31,7 +31,7 @@ class BaseModalProcessor: def __init__(self, lightrag: LightRAG, modal_caption_func): """Initialize base processor - + Args: lightrag: LightRAG instance modal_caption_func: Function for generating descriptions @@ -65,8 +65,8 @@ class BaseModalProcessor: raise NotImplementedError("Subclasses must implement this method") async def _create_entity_and_chunk( - self, modal_chunk: str, entity_info: Dict[str, Any], - file_path: str) -> Tuple[str, Dict[str, Any]]: + self, modal_chunk: str, entity_info: Dict[str, Any], file_path: str + ) -> Tuple[str, Dict[str, Any]]: """Create entity and text chunk""" # Create chunk chunk_id = compute_mdhash_id(str(modal_chunk), prefix="chunk-") @@ -93,16 +93,16 @@ class BaseModalProcessor: "created_at": int(time.time()), } - await self.knowledge_graph_inst.upsert_node(entity_info["entity_name"], - node_data) + await self.knowledge_graph_inst.upsert_node( + entity_info["entity_name"], node_data + ) # Insert entity into vector database entity_vdb_data = { compute_mdhash_id(entity_info["entity_name"], prefix="ent-"): { "entity_name": entity_info["entity_name"], "entity_type": entity_info["entity_type"], - "content": - f"{entity_info['entity_name']}\n{entity_info['summary']}", + "content": f"{entity_info['entity_name']}\n{entity_info['summary']}", "source_id": chunk_id, "file_path": file_path, } @@ -110,8 +110,7 @@ class BaseModalProcessor: await self.entities_vdb.upsert(entity_vdb_data) # Process entity and relationship extraction - await self._process_chunk_for_extraction(chunk_id, - entity_info["entity_name"]) + await self._process_chunk_for_extraction(chunk_id, entity_info["entity_name"]) # Ensure all storage updates are complete await self._insert_done() @@ -120,11 +119,12 @@ class BaseModalProcessor: "entity_name": entity_info["entity_name"], "entity_type": entity_info["entity_type"], "description": entity_info["summary"], - "chunk_id": chunk_id + "chunk_id": chunk_id, } - async def _process_chunk_for_extraction(self, chunk_id: str, - modal_entity_name: str): + async def _process_chunk_for_extraction( + self, chunk_id: str, modal_entity_name: str + ): """Process chunk for entity and relationship extraction""" chunk_data = await self.text_chunks_db.get_by_id(chunk_id) if not chunk_data: @@ -168,37 +168,27 @@ class BaseModalProcessor: if entity_name != modal_entity_name: # Skip self-relationship # Create belongs_to relationship relation_data = { - "description": - f"Entity {entity_name} belongs to {modal_entity_name}", - "keywords": - "belongs_to,part_of,contained_in", - "source_id": - chunk_id, - "weight": - 10.0, - "file_path": - chunk_data.get("file_path", "manual_creation"), + "description": f"Entity {entity_name} belongs to {modal_entity_name}", + "keywords": "belongs_to,part_of,contained_in", + "source_id": chunk_id, + "weight": 10.0, + "file_path": chunk_data.get("file_path", "manual_creation"), } await self.knowledge_graph_inst.upsert_edge( - entity_name, modal_entity_name, relation_data) + entity_name, modal_entity_name, relation_data + ) - relation_id = compute_mdhash_id(entity_name + - modal_entity_name, - prefix="rel-") + relation_id = compute_mdhash_id( + entity_name + modal_entity_name, prefix="rel-" + ) relation_vdb_data = { relation_id: { - "src_id": - entity_name, - "tgt_id": - modal_entity_name, - "keywords": - relation_data["keywords"], - "content": - f"{relation_data['keywords']}\t{entity_name}\n{modal_entity_name}\n{relation_data['description']}", - "source_id": - chunk_id, - "file_path": - chunk_data.get("file_path", "manual_creation"), + "src_id": entity_name, + "tgt_id": modal_entity_name, + "keywords": relation_data["keywords"], + "content": f"{relation_data['keywords']}\t{entity_name}\n{modal_entity_name}\n{relation_data['description']}", + "source_id": chunk_id, + "file_path": chunk_data.get("file_path", "manual_creation"), } } await self.relationships_vdb.upsert(relation_vdb_data) @@ -215,16 +205,18 @@ class BaseModalProcessor: ) async def _insert_done(self) -> None: - await asyncio.gather(*[ - cast(StorageNameSpace, storage_inst).index_done_callback() - for storage_inst in [ - self.text_chunks_db, - self.chunks_vdb, - self.entities_vdb, - self.relationships_vdb, - self.knowledge_graph_inst, + await asyncio.gather( + *[ + cast(StorageNameSpace, storage_inst).index_done_callback() + for storage_inst in [ + self.text_chunks_db, + self.chunks_vdb, + self.entities_vdb, + self.relationships_vdb, + self.knowledge_graph_inst, + ] ] - ]) + ) class ImageModalProcessor(BaseModalProcessor): @@ -232,7 +224,7 @@ class ImageModalProcessor(BaseModalProcessor): def __init__(self, lightrag: LightRAG, modal_caption_func): """Initialize image processor - + Args: lightrag: LightRAG instance modal_caption_func: Function for generating descriptions (supporting image understanding) @@ -243,8 +235,7 @@ class ImageModalProcessor(BaseModalProcessor): """Encode image to base64""" try: with open(image_path, "rb") as image_file: - encoded_string = base64.b64encode( - image_file.read()).decode('utf-8') + encoded_string = base64.b64encode(image_file.read()).decode("utf-8") return encoded_string except Exception as e: logger.error(f"Failed to encode image {image_path}: {e}") @@ -309,13 +300,12 @@ class ImageModalProcessor(BaseModalProcessor): response = await self.modal_caption_func( vision_prompt, image_data=image_base64, - system_prompt= - "You are an expert image analyst. Provide detailed, accurate descriptions." + system_prompt="You are an expert image analyst. Provide detailed, accurate descriptions.", ) else: # Analyze based on existing text information text_prompt = f"""Based on the following image information, provide analysis: - + Image Path: {image_path} Captions: {captions} Footnotes: {footnotes} @@ -324,13 +314,11 @@ class ImageModalProcessor(BaseModalProcessor): response = await self.modal_caption_func( text_prompt, - system_prompt= - "You are an expert image analyst. Provide detailed analysis based on available information." + system_prompt="You are an expert image analyst. Provide detailed analysis based on available information.", ) # Parse response - enhanced_caption, entity_info = self._parse_response( - response, entity_name) + enhanced_caption, entity_info = self._parse_response(response, entity_name) # Build complete image content modal_chunk = f""" @@ -341,27 +329,30 @@ class ImageModalProcessor(BaseModalProcessor): Visual Analysis: {enhanced_caption}""" - return await self._create_entity_and_chunk(modal_chunk, - entity_info, file_path) + return await self._create_entity_and_chunk( + modal_chunk, entity_info, file_path + ) except Exception as e: logger.error(f"Error processing image content: {e}") # Fallback processing fallback_entity = { - "entity_name": entity_name if entity_name else - f"image_{compute_mdhash_id(str(modal_content))}", + "entity_name": entity_name + if entity_name + else f"image_{compute_mdhash_id(str(modal_content))}", "entity_type": "image", - "summary": f"Image content: {str(modal_content)[:100]}" + "summary": f"Image content: {str(modal_content)[:100]}", } return str(modal_content), fallback_entity - def _parse_response(self, - response: str, - entity_name: str = None) -> Tuple[str, Dict[str, Any]]: + def _parse_response( + self, response: str, entity_name: str = None + ) -> Tuple[str, Dict[str, Any]]: """Parse model response""" try: response_data = json.loads( - re.search(r"\{.*\}", response, re.DOTALL).group(0)) + re.search(r"\{.*\}", response, re.DOTALL).group(0) + ) description = response_data.get("detailed_description", "") entity_data = response_data.get("entity_info", {}) @@ -369,11 +360,14 @@ class ImageModalProcessor(BaseModalProcessor): if not description or not entity_data: raise ValueError("Missing required fields in response") - if not all(key in entity_data - for key in ["entity_name", "entity_type", "summary"]): + if not all( + key in entity_data for key in ["entity_name", "entity_type", "summary"] + ): raise ValueError("Missing required fields in entity_info") - entity_data["entity_name"] = entity_data["entity_name"] + f" ({entity_data['entity_type']})" + entity_data["entity_name"] = ( + entity_data["entity_name"] + f" ({entity_data['entity_type']})" + ) if entity_name: entity_data["entity_name"] = entity_name @@ -382,13 +376,11 @@ class ImageModalProcessor(BaseModalProcessor): except (json.JSONDecodeError, AttributeError, ValueError) as e: logger.error(f"Error parsing image analysis response: {e}") fallback_entity = { - "entity_name": - entity_name - if entity_name else f"image_{compute_mdhash_id(response)}", - "entity_type": - "image", - "summary": - response[:100] + "..." if len(response) > 100 else response + "entity_name": entity_name + if entity_name + else f"image_{compute_mdhash_id(response)}", + "entity_type": "image", + "summary": response[:100] + "..." if len(response) > 100 else response, } return response, fallback_entity @@ -447,15 +439,15 @@ class TableModalProcessor(BaseModalProcessor): response = await self.modal_caption_func( table_prompt, - system_prompt= - "You are an expert data analyst. Provide detailed table analysis with specific insights." + system_prompt="You are an expert data analyst. Provide detailed table analysis with specific insights.", ) # Parse response enhanced_caption, entity_info = self._parse_table_response( - response, entity_name) - - #TODO: Add Retry Mechanism + response, entity_name + ) + + # TODO: Add Retry Mechanism # Build complete table content modal_chunk = f"""Table Analysis: @@ -466,17 +458,16 @@ class TableModalProcessor(BaseModalProcessor): Analysis: {enhanced_caption}""" - return await self._create_entity_and_chunk(modal_chunk, entity_info, - file_path) + return await self._create_entity_and_chunk(modal_chunk, entity_info, file_path) def _parse_table_response( - self, - response: str, - entity_name: str = None) -> Tuple[str, Dict[str, Any]]: + self, response: str, entity_name: str = None + ) -> Tuple[str, Dict[str, Any]]: """Parse table analysis response""" try: response_data = json.loads( - re.search(r"\{.*\}", response, re.DOTALL).group(0)) + re.search(r"\{.*\}", response, re.DOTALL).group(0) + ) description = response_data.get("detailed_description", "") entity_data = response_data.get("entity_info", {}) @@ -484,11 +475,14 @@ class TableModalProcessor(BaseModalProcessor): if not description or not entity_data: raise ValueError("Missing required fields in response") - if not all(key in entity_data - for key in ["entity_name", "entity_type", "summary"]): + if not all( + key in entity_data for key in ["entity_name", "entity_type", "summary"] + ): raise ValueError("Missing required fields in entity_info") - entity_data["entity_name"] = entity_data["entity_name"] + f" ({entity_data['entity_type']})" + entity_data["entity_name"] = ( + entity_data["entity_name"] + f" ({entity_data['entity_type']})" + ) if entity_name: entity_data["entity_name"] = entity_name @@ -497,13 +491,11 @@ class TableModalProcessor(BaseModalProcessor): except (json.JSONDecodeError, AttributeError, ValueError) as e: logger.error(f"Error parsing table analysis response: {e}") fallback_entity = { - "entity_name": - entity_name - if entity_name else f"table_{compute_mdhash_id(response)}", - "entity_type": - "table", - "summary": - response[:100] + "..." if len(response) > 100 else response + "entity_name": entity_name + if entity_name + else f"table_{compute_mdhash_id(response)}", + "entity_type": "table", + "summary": response[:100] + "..." if len(response) > 100 else response, } return response, fallback_entity @@ -559,13 +551,13 @@ class EquationModalProcessor(BaseModalProcessor): response = await self.modal_caption_func( equation_prompt, - system_prompt= - "You are an expert mathematician. Provide detailed mathematical analysis." + system_prompt="You are an expert mathematician. Provide detailed mathematical analysis.", ) # Parse response enhanced_caption, entity_info = self._parse_equation_response( - response, entity_name) + response, entity_name + ) # Build complete equation content modal_chunk = f"""Mathematical Equation Analysis: @@ -574,17 +566,16 @@ class EquationModalProcessor(BaseModalProcessor): Mathematical Analysis: {enhanced_caption}""" - return await self._create_entity_and_chunk(modal_chunk, entity_info, - file_path) + return await self._create_entity_and_chunk(modal_chunk, entity_info, file_path) def _parse_equation_response( - self, - response: str, - entity_name: str = None) -> Tuple[str, Dict[str, Any]]: + self, response: str, entity_name: str = None + ) -> Tuple[str, Dict[str, Any]]: """Parse equation analysis response""" try: response_data = json.loads( - re.search(r"\{.*\}", response, re.DOTALL).group(0)) + re.search(r"\{.*\}", response, re.DOTALL).group(0) + ) description = response_data.get("detailed_description", "") entity_data = response_data.get("entity_info", {}) @@ -592,11 +583,14 @@ class EquationModalProcessor(BaseModalProcessor): if not description or not entity_data: raise ValueError("Missing required fields in response") - if not all(key in entity_data - for key in ["entity_name", "entity_type", "summary"]): + if not all( + key in entity_data for key in ["entity_name", "entity_type", "summary"] + ): raise ValueError("Missing required fields in entity_info") - entity_data["entity_name"] = entity_data["entity_name"] + f" ({entity_data['entity_type']})" + entity_data["entity_name"] = ( + entity_data["entity_name"] + f" ({entity_data['entity_type']})" + ) if entity_name: entity_data["entity_name"] = entity_name @@ -605,13 +599,11 @@ class EquationModalProcessor(BaseModalProcessor): except (json.JSONDecodeError, AttributeError, ValueError) as e: logger.error(f"Error parsing equation analysis response: {e}") fallback_entity = { - "entity_name": - entity_name - if entity_name else f"equation_{compute_mdhash_id(response)}", - "entity_type": - "equation", - "summary": - response[:100] + "..." if len(response) > 100 else response + "entity_name": entity_name + if entity_name + else f"equation_{compute_mdhash_id(response)}", + "entity_type": "equation", + "summary": response[:100] + "..." if len(response) > 100 else response, } return response, fallback_entity @@ -651,13 +643,13 @@ class GenericModalProcessor(BaseModalProcessor): response = await self.modal_caption_func( generic_prompt, - system_prompt= - f"You are an expert content analyst specializing in {content_type} content." + system_prompt=f"You are an expert content analyst specializing in {content_type} content.", ) # Parse response enhanced_caption, entity_info = self._parse_generic_response( - response, entity_name, content_type) + response, entity_name, content_type + ) # Build complete content modal_chunk = f"""{content_type.title()} Content Analysis: @@ -665,18 +657,16 @@ class GenericModalProcessor(BaseModalProcessor): Analysis: {enhanced_caption}""" - return await self._create_entity_and_chunk(modal_chunk, entity_info, - file_path) + return await self._create_entity_and_chunk(modal_chunk, entity_info, file_path) def _parse_generic_response( - self, - response: str, - entity_name: str = None, - content_type: str = "content") -> Tuple[str, Dict[str, Any]]: + self, response: str, entity_name: str = None, content_type: str = "content" + ) -> Tuple[str, Dict[str, Any]]: """Parse generic analysis response""" try: response_data = json.loads( - re.search(r"\{.*\}", response, re.DOTALL).group(0)) + re.search(r"\{.*\}", response, re.DOTALL).group(0) + ) description = response_data.get("detailed_description", "") entity_data = response_data.get("entity_info", {}) @@ -684,11 +674,14 @@ class GenericModalProcessor(BaseModalProcessor): if not description or not entity_data: raise ValueError("Missing required fields in response") - if not all(key in entity_data - for key in ["entity_name", "entity_type", "summary"]): + if not all( + key in entity_data for key in ["entity_name", "entity_type", "summary"] + ): raise ValueError("Missing required fields in entity_info") - entity_data["entity_name"] = entity_data["entity_name"] + f" ({entity_data['entity_type']})" + entity_data["entity_name"] = ( + entity_data["entity_name"] + f" ({entity_data['entity_type']})" + ) if entity_name: entity_data["entity_name"] = entity_name @@ -697,12 +690,10 @@ class GenericModalProcessor(BaseModalProcessor): except (json.JSONDecodeError, AttributeError, ValueError) as e: logger.error(f"Error parsing generic analysis response: {e}") fallback_entity = { - "entity_name": - entity_name if entity_name else - f"{content_type}_{compute_mdhash_id(response)}", - "entity_type": - content_type, - "summary": - response[:100] + "..." if len(response) > 100 else response + "entity_name": entity_name + if entity_name + else f"{content_type}_{compute_mdhash_id(response)}", + "entity_type": content_type, + "summary": response[:100] + "..." if len(response) > 100 else response, } return response, fallback_entity diff --git a/lightrag/raganything.py b/lightrag/raganything.py index af98a104..55343302 100644 --- a/lightrag/raganything.py +++ b/lightrag/raganything.py @@ -26,15 +26,15 @@ from lightrag.mineru_parser import MineruParser # Import specialized processors from lightrag.modalprocessors import ( ImageModalProcessor, - TableModalProcessor, + TableModalProcessor, EquationModalProcessor, - GenericModalProcessor + GenericModalProcessor, ) class RAGAnything: """Multimodal Document Processing Pipeline - Complete document parsing and insertion pipeline""" - + def __init__( self, lightrag: Optional[LightRAG] = None, @@ -43,11 +43,11 @@ class RAGAnything: embedding_func: Optional[Callable] = None, working_dir: str = "./rag_storage", embedding_dim: int = 3072, - max_token_size: int = 8192 + max_token_size: int = 8192, ): """ Initialize Multimodal Document Processing Pipeline - + Args: lightrag: Optional pre-initialized LightRAG instance llm_model_func: LLM model function for text analysis @@ -63,64 +63,67 @@ class RAGAnything: self.embedding_func = embedding_func self.embedding_dim = embedding_dim self.max_token_size = max_token_size - + # Set up logging setup_logger("RAGAnything") self.logger = logging.getLogger("RAGAnything") - + # Create working directory if needed if not os.path.exists(working_dir): os.makedirs(working_dir) - + # Use provided LightRAG or mark for later initialization self.lightrag = lightrag self.modal_processors = {} - + # If LightRAG is provided, initialize processors immediately if self.lightrag is not None: self._initialize_processors() - + def _initialize_processors(self): """Initialize multimodal processors with appropriate model functions""" if self.lightrag is None: - raise ValueError("LightRAG instance must be initialized before creating processors") - + raise ValueError( + "LightRAG instance must be initialized before creating processors" + ) + # Create different multimodal processors self.modal_processors = { "image": ImageModalProcessor( lightrag=self.lightrag, - modal_caption_func=self.vision_model_func or self.llm_model_func + modal_caption_func=self.vision_model_func or self.llm_model_func, ), "table": TableModalProcessor( - lightrag=self.lightrag, - modal_caption_func=self.llm_model_func + lightrag=self.lightrag, modal_caption_func=self.llm_model_func ), "equation": EquationModalProcessor( - lightrag=self.lightrag, - modal_caption_func=self.llm_model_func + lightrag=self.lightrag, modal_caption_func=self.llm_model_func ), "generic": GenericModalProcessor( - lightrag=self.lightrag, - modal_caption_func=self.llm_model_func - ) + lightrag=self.lightrag, modal_caption_func=self.llm_model_func + ), } - + self.logger.info("Multimodal processors initialized") self.logger.info(f"Available processors: {list(self.modal_processors.keys())}") - + async def _ensure_lightrag_initialized(self): """Ensure LightRAG instance is initialized, create if necessary""" if self.lightrag is not None: return - + # Validate required functions if self.llm_model_func is None: - raise ValueError("llm_model_func must be provided when LightRAG is not pre-initialized") + raise ValueError( + "llm_model_func must be provided when LightRAG is not pre-initialized" + ) if self.embedding_func is None: - raise ValueError("embedding_func must be provided when LightRAG is not pre-initialized") - + raise ValueError( + "embedding_func must be provided when LightRAG is not pre-initialized" + ) + from lightrag.kg.shared_storage import initialize_pipeline_status - + # Create LightRAG instance with provided functions self.lightrag = LightRAG( working_dir=self.working_dir, @@ -134,88 +137,86 @@ class RAGAnything: await self.lightrag.initialize_storages() await initialize_pipeline_status() - + # Initialize processors after LightRAG is ready self._initialize_processors() - + self.logger.info("LightRAG and multimodal processors initialized") def parse_document( - self, - file_path: str, + self, + file_path: str, output_dir: str = "./output", parse_method: str = "auto", - display_stats: bool = True + display_stats: bool = True, ) -> Tuple[List[Dict[str, Any]], str]: """ Parse document using MinerU - + Args: file_path: Path to the file to parse output_dir: Output directory parse_method: Parse method ("auto", "ocr", "txt") display_stats: Whether to display content statistics - + Returns: (content_list, md_content): Content list and markdown text """ self.logger.info(f"Starting document parsing: {file_path}") - + file_path = Path(file_path) if not file_path.exists(): raise FileNotFoundError(f"File not found: {file_path}") - + # Choose appropriate parsing method based on file extension ext = file_path.suffix.lower() - + try: if ext in [".pdf"]: - self.logger.info(f"Detected PDF file, using PDF parser (OCR={parse_method == 'ocr'})...") + self.logger.info( + f"Detected PDF file, using PDF parser (OCR={parse_method == 'ocr'})..." + ) content_list, md_content = MineruParser.parse_pdf( - file_path, - output_dir, - use_ocr=(parse_method == "ocr") + file_path, output_dir, use_ocr=(parse_method == "ocr") ) elif ext in [".jpg", ".jpeg", ".png", ".bmp", ".tiff", ".tif"]: self.logger.info("Detected image file, using image parser...") content_list, md_content = MineruParser.parse_image( - file_path, - output_dir + file_path, output_dir ) elif ext in [".doc", ".docx", ".ppt", ".pptx"]: self.logger.info("Detected Office document, using Office parser...") content_list, md_content = MineruParser.parse_office_doc( - file_path, - output_dir + file_path, output_dir ) else: # For other or unknown formats, use generic parser - self.logger.info(f"Using generic parser for {ext} file (method={parse_method})...") - content_list, md_content = MineruParser.parse_document( - file_path, - parse_method=parse_method, - output_dir=output_dir + self.logger.info( + f"Using generic parser for {ext} file (method={parse_method})..." ) - + content_list, md_content = MineruParser.parse_document( + file_path, parse_method=parse_method, output_dir=output_dir + ) + except Exception as e: self.logger.error(f"Error during parsing with specific parser: {str(e)}") self.logger.warning("Falling back to generic parser...") # If specific parser fails, fall back to generic parser content_list, md_content = MineruParser.parse_document( - file_path, - parse_method=parse_method, - output_dir=output_dir + file_path, parse_method=parse_method, output_dir=output_dir ) - - self.logger.info(f"Parsing complete! Extracted {len(content_list)} content blocks") + + self.logger.info( + f"Parsing complete! Extracted {len(content_list)} content blocks" + ) self.logger.info(f"Markdown text length: {len(md_content)} characters") - + # Display content statistics if requested if display_stats: self.logger.info("\nContent Information:") self.logger.info(f"* Total blocks in content_list: {len(content_list)}") self.logger.info(f"* Markdown content length: {len(md_content)} characters") - + # Count elements by type block_types: Dict[str, int] = {} for block in content_list: @@ -223,29 +224,31 @@ class RAGAnything: block_type = block.get("type", "unknown") if isinstance(block_type, str): block_types[block_type] = block_types.get(block_type, 0) + 1 - + self.logger.info("* Content block types:") for block_type, count in block_types.items(): self.logger.info(f" - {block_type}: {count}") - + return content_list, md_content - def _separate_content(self, content_list: List[Dict[str, Any]]) -> Tuple[str, List[Dict[str, Any]]]: + def _separate_content( + self, content_list: List[Dict[str, Any]] + ) -> Tuple[str, List[Dict[str, Any]]]: """ Separate text content and multimodal content - + Args: content_list: Content list from MinerU parsing - + Returns: (text_content, multimodal_items): Pure text content and multimodal items list """ text_parts = [] multimodal_items = [] - + for item in content_list: content_type = item.get("type", "text") - + if content_type == "text": # Text content text = item.get("text", "") @@ -254,27 +257,27 @@ class RAGAnything: else: # Multimodal content (image, table, equation, etc.) multimodal_items.append(item) - + # Merge all text content text_content = "\n\n".join(text_parts) - - self.logger.info(f"Content separation complete:") + + self.logger.info("Content separation complete:") self.logger.info(f" - Text content length: {len(text_content)} characters") self.logger.info(f" - Multimodal items count: {len(multimodal_items)}") - + # Count multimodal types modal_types = {} for item in multimodal_items: modal_type = item.get("type", "unknown") modal_types[modal_type] = modal_types.get(modal_type, 0) + 1 - + if modal_types: self.logger.info(f" - Multimodal type distribution: {modal_types}") - + return text_content, multimodal_items async def _insert_text_content( - self, + self, input: str | list[str], split_by_character: str | None = None, split_by_character_only: bool = False, @@ -283,7 +286,7 @@ class RAGAnything: ): """ Insert pure text content into LightRAG - + Args: input: Single document string or list of document strings split_by_character: if split_by_character is not None, split the string by character, if chunk longer than @@ -292,24 +295,26 @@ class RAGAnything: split_by_character is None, this parameter is ignored. ids: single string of the document ID or list of unique document IDs, if not provided, MD5 hash IDs will be generated file_paths: single string of the file path or list of file paths, used for citation - """ + """ self.logger.info("Starting text content insertion into LightRAG...") - + # Use LightRAG's insert method with all parameters await self.lightrag.ainsert( input=input, file_paths=file_paths, split_by_character=split_by_character, split_by_character_only=split_by_character_only, - ids=ids + ids=ids, ) - + self.logger.info("Text content insertion complete") - async def _process_multimodal_content(self, multimodal_items: List[Dict[str, Any]], file_path: str): + async def _process_multimodal_content( + self, multimodal_items: List[Dict[str, Any]], file_path: str + ): """ Process multimodal content (using specialized processors) - + Args: multimodal_items: List of multimodal items file_path: File path (for reference) @@ -317,43 +322,52 @@ class RAGAnything: if not multimodal_items: self.logger.debug("No multimodal content to process") return - + self.logger.info("Starting multimodal content processing...") - + file_name = os.path.basename(file_path) - + for i, item in enumerate(multimodal_items): try: content_type = item.get("type", "unknown") - self.logger.info(f"Processing item {i+1}/{len(multimodal_items)}: {content_type} content") - + self.logger.info( + f"Processing item {i+1}/{len(multimodal_items)}: {content_type} content" + ) + # Select appropriate processor processor = self._get_processor_for_type(content_type) - + if processor: - enhanced_caption, entity_info = await processor.process_multimodal_content( + ( + enhanced_caption, + entity_info, + ) = await processor.process_multimodal_content( modal_content=item, content_type=content_type, - file_path=file_name + file_path=file_name, + ) + self.logger.info( + f"{content_type} processing complete: {entity_info.get('entity_name', 'Unknown')}" ) - self.logger.info(f"{content_type} processing complete: {entity_info.get('entity_name', 'Unknown')}") else: - self.logger.warning(f"No suitable processor found for {content_type} type content") - + self.logger.warning( + f"No suitable processor found for {content_type} type content" + ) + except Exception as e: self.logger.error(f"Error processing multimodal content: {str(e)}") self.logger.debug("Exception details:", exc_info=True) continue - + self.logger.info("Multimodal content processing complete") def _get_processor_for_type(self, content_type: str): """ Get appropriate processor based on content type - + Args: content_type: Content type - + Returns: Corresponding processor instance """ @@ -369,18 +383,18 @@ class RAGAnything: return self.modal_processors.get("generic") async def process_document_complete( - self, - file_path: str, + self, + file_path: str, output_dir: str = "./output", parse_method: str = "auto", display_stats: bool = True, split_by_character: str | None = None, split_by_character_only: bool = False, - doc_id: str | None = None + doc_id: str | None = None, ): """ Complete document processing workflow - + Args: file_path: Path to the file to process output_dir: MinerU output directory @@ -392,35 +406,32 @@ class RAGAnything: """ # Ensure LightRAG is initialized await self._ensure_lightrag_initialized() - + self.logger.info(f"Starting complete document processing: {file_path}") - + # Step 1: Parse document using MinerU content_list, md_content = self.parse_document( - file_path, - output_dir, - parse_method, - display_stats + file_path, output_dir, parse_method, display_stats ) - + # Step 2: Separate text and multimodal content text_content, multimodal_items = self._separate_content(content_list) - + # Step 3: Insert pure text content with all parameters if text_content.strip(): file_name = os.path.basename(file_path) await self._insert_text_content( - text_content, + text_content, file_paths=file_name, split_by_character=split_by_character, split_by_character_only=split_by_character_only, - ids=doc_id + ids=doc_id, ) - + # Step 4: Process multimodal content (using specialized processors) if multimodal_items: await self._process_multimodal_content(multimodal_items, file_path) - + self.logger.info(f"Document {file_path} processing complete!") async def process_folder_complete( @@ -433,11 +444,11 @@ class RAGAnything: split_by_character_only: bool = False, file_extensions: Optional[List[str]] = None, recursive: bool = True, - max_workers: int = 1 + max_workers: int = 1, ): """ Process all files in a folder in batch - + Args: folder_path: Path to the folder to process output_dir: MinerU output directory @@ -451,75 +462,98 @@ class RAGAnything: """ # Ensure LightRAG is initialized await self._ensure_lightrag_initialized() - + folder_path = Path(folder_path) if not folder_path.exists() or not folder_path.is_dir(): - raise ValueError(f"Folder does not exist or is not a valid directory: {folder_path}") - + raise ValueError( + f"Folder does not exist or is not a valid directory: {folder_path}" + ) + # Supported file formats supported_extensions = { - ".pdf", ".jpg", ".jpeg", ".png", ".bmp", ".tiff", ".tif", - ".doc", ".docx", ".ppt", ".pptx", ".txt", ".md" + ".pdf", + ".jpg", + ".jpeg", + ".png", + ".bmp", + ".tiff", + ".tif", + ".doc", + ".docx", + ".ppt", + ".pptx", + ".txt", + ".md", } - + # Use specified extensions or all supported formats if file_extensions: target_extensions = set(ext.lower() for ext in file_extensions) # Validate if all are supported formats unsupported = target_extensions - supported_extensions if unsupported: - self.logger.warning(f"The following file formats may not be fully supported: {unsupported}") + self.logger.warning( + f"The following file formats may not be fully supported: {unsupported}" + ) else: target_extensions = supported_extensions - + # Collect all files to process files_to_process = [] - + if recursive: # Recursively traverse all subfolders for file_path in folder_path.rglob("*"): - if file_path.is_file() and file_path.suffix.lower() in target_extensions: + if ( + file_path.is_file() + and file_path.suffix.lower() in target_extensions + ): files_to_process.append(file_path) else: # Process only current folder for file_path in folder_path.glob("*"): - if file_path.is_file() and file_path.suffix.lower() in target_extensions: + if ( + file_path.is_file() + and file_path.suffix.lower() in target_extensions + ): files_to_process.append(file_path) - + if not files_to_process: self.logger.info(f"No files to process found in {folder_path}") return - + self.logger.info(f"Found {len(files_to_process)} files to process") - self.logger.info(f"File type distribution:") - + self.logger.info("File type distribution:") + # Count file types file_type_count = {} for file_path in files_to_process: ext = file_path.suffix.lower() file_type_count[ext] = file_type_count.get(ext, 0) + 1 - + for ext, count in sorted(file_type_count.items()): self.logger.info(f" {ext}: {count} files") - + # Create progress tracking processed_count = 0 failed_files = [] - + # Use semaphore to control concurrency semaphore = asyncio.Semaphore(max_workers) - + async def process_single_file(file_path: Path, index: int) -> None: """Process a single file""" async with semaphore: nonlocal processed_count try: - self.logger.info(f"[{index}/{len(files_to_process)}] Processing: {file_path}") - + self.logger.info( + f"[{index}/{len(files_to_process)}] Processing: {file_path}" + ) + # Create separate output directory for each file file_output_dir = Path(output_dir) / file_path.stem file_output_dir.mkdir(parents=True, exist_ok=True) - + # Process file await self.process_document_complete( file_path=str(file_path), @@ -527,56 +561,56 @@ class RAGAnything: parse_method=parse_method, display_stats=display_stats, split_by_character=split_by_character, - split_by_character_only=split_by_character_only + split_by_character_only=split_by_character_only, ) - + processed_count += 1 - self.logger.info(f"[{index}/{len(files_to_process)}] Successfully processed: {file_path}") - + self.logger.info( + f"[{index}/{len(files_to_process)}] Successfully processed: {file_path}" + ) + except Exception as e: - self.logger.error(f"[{index}/{len(files_to_process)}] Failed to process: {file_path}") + self.logger.error( + f"[{index}/{len(files_to_process)}] Failed to process: {file_path}" + ) self.logger.error(f"Error: {str(e)}") failed_files.append((file_path, str(e))) - + # Create all processing tasks tasks = [] for index, file_path in enumerate(files_to_process, 1): task = process_single_file(file_path, index) tasks.append(task) - + # Wait for all tasks to complete await asyncio.gather(*tasks, return_exceptions=True) - + # Output processing statistics self.logger.info("\n===== Batch Processing Complete =====") self.logger.info(f"Total files: {len(files_to_process)}") self.logger.info(f"Successfully processed: {processed_count}") self.logger.info(f"Failed: {len(failed_files)}") - + if failed_files: self.logger.info("\nFailed files:") for file_path, error in failed_files: self.logger.info(f" - {file_path}: {error}") - + return { "total": len(files_to_process), "success": processed_count, "failed": len(failed_files), - "failed_files": failed_files + "failed_files": failed_files, } - async def query_with_multimodal( - self, - query: str, - mode: str = "hybrid" - ) -> str: + async def query_with_multimodal(self, query: str, mode: str = "hybrid") -> str: """ Query with multimodal content support - + Args: query: Query content mode: Query mode - + Returns: Query result """ @@ -588,45 +622,65 @@ class RAGAnything: "2. Process documents first using process_document_complete() or process_folder_complete() " "to create and populate the LightRAG instance." ) - - result = await self.lightrag.aquery( - query, - param=QueryParam(mode=mode) - ) - + + result = await self.lightrag.aquery(query, param=QueryParam(mode=mode)) + return result def get_processor_info(self) -> Dict[str, Any]: """Get processor information""" if not self.modal_processors: return {"status": "Not initialized"} - + info = { "status": "Initialized", "processors": {}, "models": { - "llm_model": "External function" if self.llm_model_func else "Not provided", - "vision_model": "External function" if self.vision_model_func else "Not provided", - "embedding_model": "External function" if self.embedding_func else "Not provided" - } + "llm_model": "External function" + if self.llm_model_func + else "Not provided", + "vision_model": "External function" + if self.vision_model_func + else "Not provided", + "embedding_model": "External function" + if self.embedding_func + else "Not provided", + }, } - + for proc_type, processor in self.modal_processors.items(): info["processors"][proc_type] = { "class": processor.__class__.__name__, - "supports": self._get_processor_supports(proc_type) + "supports": self._get_processor_supports(proc_type), } - + return info def _get_processor_supports(self, proc_type: str) -> List[str]: """Get processor supported features""" supports_map = { - "image": ["Image content analysis", "Visual understanding", "Image description generation", "Image entity extraction"], - "table": ["Table structure analysis", "Data statistics", "Trend identification", "Table entity extraction"], - "equation": ["Mathematical formula parsing", "Variable identification", "Formula meaning explanation", "Formula entity extraction"], - "generic": ["General content analysis", "Structured processing", "Entity extraction"] + "image": [ + "Image content analysis", + "Visual understanding", + "Image description generation", + "Image entity extraction", + ], + "table": [ + "Table structure analysis", + "Data statistics", + "Trend identification", + "Table entity extraction", + ], + "equation": [ + "Mathematical formula parsing", + "Variable identification", + "Formula meaning explanation", + "Formula entity extraction", + ], + "generic": [ + "General content analysis", + "Structured processing", + "Entity extraction", + ], } return supports_map.get(proc_type, ["Basic processing"]) - -