From dcec511f7277290d793bd0d5fe1f0602f6c5e094 Mon Sep 17 00:00:00 2001 From: yangdx Date: Mon, 18 Aug 2025 04:37:12 +0800 Subject: [PATCH 1/4] feat: increase file path length limit to 32768 and add schema migration for Milvus DB - Bump path limit to 32768 chars - Add migration detection logic - Implement dual-client migration - Auto-migrate old collections --- lightrag/constants.py | 2 +- lightrag/kg/milvus_impl.py | 225 ++++++++++++++++++++++++++++++++++++- 2 files changed, 223 insertions(+), 4 deletions(-) diff --git a/lightrag/constants.py b/lightrag/constants.py index 1c7917c1..ff74365d 100644 --- a/lightrag/constants.py +++ b/lightrag/constants.py @@ -36,7 +36,7 @@ DEFAULT_ENABLE_RERANK = True DEFAULT_MIN_RERANK_SCORE = 0.0 # File path configuration for vector and graph database(Should not be changed, used in Milvus Schema) -DEFAULT_MAX_FILE_PATH_LENGTH = 4090 +DEFAULT_MAX_FILE_PATH_LENGTH = 32768 # Default temperature for LLM DEFAULT_TEMPERATURE = 1.0 diff --git a/lightrag/kg/milvus_impl.py b/lightrag/kg/milvus_impl.py index 4d927353..94eea9b3 100644 --- a/lightrag/kg/milvus_impl.py +++ b/lightrag/kg/milvus_impl.py @@ -83,7 +83,7 @@ class MilvusVectorDBStorage(BaseVectorStorage): FieldSchema( name="file_path", dtype=DataType.VARCHAR, - max_length=1024, + max_length=DEFAULT_MAX_FILE_PATH_LENGTH, nullable=True, ), ] @@ -95,7 +95,7 @@ class MilvusVectorDBStorage(BaseVectorStorage): FieldSchema( name="file_path", dtype=DataType.VARCHAR, - max_length=1024, + max_length=DEFAULT_MAX_FILE_PATH_LENGTH, nullable=True, ), ] @@ -482,8 +482,33 @@ class MilvusVectorDBStorage(BaseVectorStorage): ) return + def _check_file_path_length_restriction(self, collection_info: dict) -> bool: + """Check if collection has file_path length restrictions that need migration + + Returns: + bool: True if migration is needed, False otherwise + """ + existing_fields = { + field["name"]: field for field in collection_info.get("fields", []) + } + + # Check if file_path field exists and has length restrictions + if "file_path" in existing_fields: + file_path_field = existing_fields["file_path"] + # Get max_length from field params + max_length = file_path_field.get("params", {}).get("max_length") + + if max_length and max_length != DEFAULT_MAX_FILE_PATH_LENGTH: + logger.info( + f"[{self.workspace}] Collection {self.namespace} has file_path max_length={max_length}, " + f"needs migration to {DEFAULT_MAX_FILE_PATH_LENGTH}" + ) + return True + + return False + def _check_schema_compatibility(self, collection_info: dict): - """Check schema field compatibility""" + """Check schema field compatibility and detect migration needs""" existing_fields = { field["name"]: field for field in collection_info.get("fields", []) } @@ -505,6 +530,14 @@ class MilvusVectorDBStorage(BaseVectorStorage): ) return + # Check if migration is needed for file_path length restrictions + if self._check_file_path_length_restriction(collection_info): + logger.info( + f"[{self.workspace}] Starting automatic migration for collection {self.namespace}" + ) + self._migrate_collection_schema() + return + # For collections with vector field, check basic compatibility # Only check for critical incompatibilities, not missing optional fields critical_fields = {"id": {"type": "VarChar", "is_primary": True}} @@ -543,6 +576,192 @@ class MilvusVectorDBStorage(BaseVectorStorage): f"[{self.workspace}] Schema compatibility check passed for {self.namespace}" ) + def _create_migration_client(self) -> MilvusClient: + """Create a new MilvusClient instance for migration operations""" + return MilvusClient( + uri=os.environ.get( + "MILVUS_URI", + config.get( + "milvus", + "uri", + fallback=os.path.join( + self.global_config["working_dir"], "milvus_lite.db" + ), + ), + ), + user=os.environ.get( + "MILVUS_USER", config.get("milvus", "user", fallback=None) + ), + password=os.environ.get( + "MILVUS_PASSWORD", + config.get("milvus", "password", fallback=None), + ), + token=os.environ.get( + "MILVUS_TOKEN", config.get("milvus", "token", fallback=None) + ), + db_name=os.environ.get( + "MILVUS_DB_NAME", + config.get("milvus", "db_name", fallback=None), + ), + ) + + def _migrate_collection_schema(self): + """Migrate collection schema using optimized dual client architecture""" + read_client = None + original_collection_name = self.final_namespace + temp_collection_name = f"{self.final_namespace}_temp" + + try: + logger.info( + f"[{self.workspace}] Starting optimized dual-client schema migration for {self.namespace}" + ) + + if self._client is not None: + self._client.close() + + # Step 1: use self._client to create a temporary collection + logger.info( + f"[{self.workspace}] Step 1: Creating temporary collection: {temp_collection_name}" + ) + # Create indexes for the new collection using self._client + # Temporarily update final_namespace for index creation + self.final_namespace = temp_collection_name + + # Create new client instance for self._client + self._client = self._create_migration_client() + new_schema = self._create_schema_for_namespace() + self._client.create_collection( + collection_name=temp_collection_name, schema=new_schema + ) + try: + self._create_indexes_after_collection() + except Exception as index_error: + logger.warning( + f"[{self.workspace}] Failed to create indexes for new collection: {index_error}" + ) + # Continue with migration even if index creation fails + + # Load the new collection + self._client.load_collection(temp_collection_name) + + # Step 2: copy old data to new collection + logger.info( + f"[{self.workspace}] Step 2: Copying old data to new collection: {original_collection_name}" + ) + read_client = self._create_migration_client() + read_client.load_collection(original_collection_name) + + page_size = 1000 + offset = 0 + total_migrated = 0 + while True: + # Read data from old collection using read_client + page_data = read_client.query( + collection_name=original_collection_name, + filter="", # Empty filter to get all data + output_fields=["*"], # Get all fields + limit=page_size, + offset=offset, + ) + + if not page_data: + # No more data to retrieve + break + + # Write data to new collection using self._client + try: + self._client.insert( + collection_name=temp_collection_name, data=page_data + ) + total_migrated += len(page_data) + + logger.debug( + f"[{self.workspace}] Migrated batch {offset//page_size + 1}, " + f"processed {len(page_data)} records, total migrated: {total_migrated}" + ) + except Exception as batch_error: + logger.error( + f"[{self.workspace}] Failed to migrate batch {offset//page_size + 1}: {batch_error}" + ) + raise + + offset += page_size + + # If we got less than page_size records, we've reached the end + if len(page_data) < page_size: + break + + if total_migrated > 0: + logger.info( + f"[{self.workspace}] Successfully migrated {total_migrated} records" + ) + else: + logger.info( + f"[{self.workspace}] No data found in original collection, migration completed" + ) + + # Step 3: drop old collection + logger.info(f"[{self.workspace}] Step 3: Dropping old collection") + read_client.drop_collection(original_collection_name) + read_client.close() + read_client = None + + # Step 4: Rename temporary collection to original name + logger.info( + f"[{self.workspace}] Step 4: Renaming collection {temp_collection_name} -> {original_collection_name}" + ) + # create new client instance for self._client + self._client = self._create_migration_client() + + try: + self._client.rename_collection( + temp_collection_name, original_collection_name + ) + logger.info(f"[{self.workspace}] Rename operation completed") + except Exception as rename_error: + logger.error( + f"[{self.workspace}] Rename operation failed: {rename_error}" + ) + raise RuntimeError( + f"Failed to rename collection: {rename_error}" + ) from rename_error + + # restore final_namespace + self.final_namespace = original_collection_name + + except Exception as e: + logger.error( + f"[{self.workspace}] Data migration failed for {self.namespace}: {e}" + ) + + # Attempt cleanup of temporary collection if it exists + try: + if self._client and self._client.has_collection(temp_collection_name): + logger.info( + f"[{self.workspace}] Cleaning up failed migration temporary collection" + ) + self._client.drop_collection(temp_collection_name) + except Exception as cleanup_error: + logger.warning( + f"[{self.workspace}] Failed to cleanup temporary collection: {cleanup_error}" + ) + + # Re-raise the original error + raise RuntimeError( + f"Optimized dual-client migration failed for collection {self.namespace}: {e}" + ) from e + + finally: + # Ensure read_client is properly closed + if read_client: + try: + read_client.close() + logger.debug(f"[{self.workspace}] Read client closed successfully") + except Exception as close_error: + logger.warning( + f"[{self.workspace}] Failed to close read client: {close_error}" + ) + def _validate_collection_compatibility(self): """Validate existing collection's dimension and schema compatibility""" try: From 453efeb92427d40b84ca455d4304f543058bc090 Mon Sep 17 00:00:00 2001 From: yangdx Date: Mon, 18 Aug 2025 13:59:27 +0800 Subject: [PATCH 2/4] Fix file path length checking to use UTF-8 byte length instead of char count --- lightrag/utils.py | 24 ++++++++++++++++-------- 1 file changed, 16 insertions(+), 8 deletions(-) diff --git a/lightrag/utils.py b/lightrag/utils.py index 055a2b27..5052897b 100644 --- a/lightrag/utils.py +++ b/lightrag/utils.py @@ -2013,7 +2013,7 @@ async def process_chunks_unified( def build_file_path(already_file_paths, data_list, target): - """Build file path string with length limit and deduplication + """Build file path string with UTF-8 byte length limit and deduplication Args: already_file_paths: List of existing file paths @@ -2028,6 +2028,14 @@ def build_file_path(already_file_paths, data_list, target): # string: filter empty value and keep file order in already_file_paths file_paths = GRAPH_FIELD_SEP.join(fp for fp in already_file_paths if fp) + + # Check if initial file_paths already exceeds byte length limit + if len(file_paths.encode("utf-8")) >= DEFAULT_MAX_FILE_PATH_LENGTH: + logger.warning( + f"Initial file_paths already exceeds {DEFAULT_MAX_FILE_PATH_LENGTH} bytes for {target}, " + f"current size: {len(file_paths.encode('utf-8'))} bytes" + ) + # ignored file_paths file_paths_ignore = "" # add file_paths @@ -2043,22 +2051,22 @@ def build_file_path(already_file_paths, data_list, target): # add file_paths_set.add(cur_file_path) - # check the length + # check the UTF-8 byte length + new_addition = GRAPH_FIELD_SEP + cur_file_path if file_paths else cur_file_path if ( - len(file_paths) + len(GRAPH_FIELD_SEP + cur_file_path) - < DEFAULT_MAX_FILE_PATH_LENGTH + len(file_paths.encode("utf-8")) + len(new_addition.encode("utf-8")) + < DEFAULT_MAX_FILE_PATH_LENGTH - 5 ): # append - file_paths += ( - GRAPH_FIELD_SEP + cur_file_path if file_paths else cur_file_path - ) + file_paths += new_addition else: # ignore file_paths_ignore += GRAPH_FIELD_SEP + cur_file_path if file_paths_ignore: logger.warning( - f"Length of file_path exceeds {target}, ignoring new file: {file_paths_ignore}" + f"File paths exceed {DEFAULT_MAX_FILE_PATH_LENGTH} bytes for {target}, " + f"ignoring file path: {file_paths_ignore}" ) return file_paths From 47b8caaf641d67bbcc3f698a5d1317248f846da5 Mon Sep 17 00:00:00 2001 From: yangdx Date: Mon, 18 Aug 2025 14:15:07 +0800 Subject: [PATCH 3/4] Stop execution on validation errors in Milvus storage MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit • Stop execution on validation errors to prevent potential data loss --- lightrag/kg/milvus_impl.py | 47 ++++++++++++++++++++++++++++++++------ 1 file changed, 40 insertions(+), 7 deletions(-) diff --git a/lightrag/kg/milvus_impl.py b/lightrag/kg/milvus_impl.py index 94eea9b3..a90b9e59 100644 --- a/lightrag/kg/milvus_impl.py +++ b/lightrag/kg/milvus_impl.py @@ -822,14 +822,42 @@ class MilvusVectorDBStorage(BaseVectorStorage): # Ensure the collection is loaded after validation self._ensure_collection_loaded() return - except Exception as describe_error: - logger.warning( - f"[{self.workspace}] Collection '{self.namespace}' exists but cannot be described: {describe_error}" + except Exception as validation_error: + # CRITICAL: Collection exists but validation failed + # This indicates potential data migration failure or incompatible schema + # Stop execution to prevent data loss and require manual intervention + logger.error( + f"[{self.workspace}] CRITICAL ERROR: Collection '{self.namespace}' exists but validation failed!" ) - logger.info( - f"[{self.workspace}] Treating as if collection doesn't exist and creating new one..." + logger.error( + f"[{self.workspace}] This indicates potential data migration failure or schema incompatibility." + ) + logger.error( + f"[{self.workspace}] Validation error: {validation_error}" + ) + logger.error(f"[{self.workspace}] MANUAL INTERVENTION REQUIRED:") + logger.error( + f"[{self.workspace}] 1. Check the existing collection schema and data integrity" + ) + logger.error( + f"[{self.workspace}] 2. Backup existing data if needed" + ) + logger.error( + f"[{self.workspace}] 3. Manually resolve schema compatibility issues" + ) + logger.error( + f"[{self.workspace}] 4. Consider dropping and recreating the collection if data is not critical" + ) + logger.error( + f"[{self.workspace}] Program execution stopped to prevent potential data loss." + ) + + # Raise a specific exception to stop execution + raise RuntimeError( + f"Collection validation failed for '{self.final_namespace}'. " + f"Data migration failure detected. Manual intervention required to prevent data loss. " + f"Original error: {validation_error}" ) - # Fall through to creation logic # Collection doesn't exist, create new collection logger.info(f"[{self.workspace}] Creating new collection: {self.namespace}") @@ -850,12 +878,17 @@ class MilvusVectorDBStorage(BaseVectorStorage): f"[{self.workspace}] Successfully created Milvus collection: {self.namespace}" ) + except RuntimeError: + # Re-raise RuntimeError (validation failures) without modification + # These are critical errors that should stop execution + raise + except Exception as e: logger.error( f"[{self.workspace}] Error in _create_collection_if_not_exist for {self.namespace}: {e}" ) - # If there's any error, try to force create the collection + # If there's any error (other than validation failure), try to force create the collection logger.info( f"[{self.workspace}] Attempting to force create collection {self.namespace}..." ) From a9d680743274b556faf070ac9587928b5e8f82f8 Mon Sep 17 00:00:00 2001 From: yangdx Date: Mon, 18 Aug 2025 16:29:03 +0800 Subject: [PATCH 4/4] Fix query windows size limitation for Milvus data migration --- lightrag/kg/milvus_impl.py | 167 +++++++++++++++++-------------------- 1 file changed, 76 insertions(+), 91 deletions(-) diff --git a/lightrag/kg/milvus_impl.py b/lightrag/kg/milvus_impl.py index a90b9e59..82dce30c 100644 --- a/lightrag/kg/milvus_impl.py +++ b/lightrag/kg/milvus_impl.py @@ -498,7 +498,7 @@ class MilvusVectorDBStorage(BaseVectorStorage): # Get max_length from field params max_length = file_path_field.get("params", {}).get("max_length") - if max_length and max_length != DEFAULT_MAX_FILE_PATH_LENGTH: + if max_length and max_length < DEFAULT_MAX_FILE_PATH_LENGTH: logger.info( f"[{self.workspace}] Collection {self.namespace} has file_path max_length={max_length}, " f"needs migration to {DEFAULT_MAX_FILE_PATH_LENGTH}" @@ -576,59 +576,23 @@ class MilvusVectorDBStorage(BaseVectorStorage): f"[{self.workspace}] Schema compatibility check passed for {self.namespace}" ) - def _create_migration_client(self) -> MilvusClient: - """Create a new MilvusClient instance for migration operations""" - return MilvusClient( - uri=os.environ.get( - "MILVUS_URI", - config.get( - "milvus", - "uri", - fallback=os.path.join( - self.global_config["working_dir"], "milvus_lite.db" - ), - ), - ), - user=os.environ.get( - "MILVUS_USER", config.get("milvus", "user", fallback=None) - ), - password=os.environ.get( - "MILVUS_PASSWORD", - config.get("milvus", "password", fallback=None), - ), - token=os.environ.get( - "MILVUS_TOKEN", config.get("milvus", "token", fallback=None) - ), - db_name=os.environ.get( - "MILVUS_DB_NAME", - config.get("milvus", "db_name", fallback=None), - ), - ) - def _migrate_collection_schema(self): - """Migrate collection schema using optimized dual client architecture""" - read_client = None + """Migrate collection schema using query_iterator - completely solves query window limitations""" original_collection_name = self.final_namespace temp_collection_name = f"{self.final_namespace}_temp" + iterator = None try: logger.info( - f"[{self.workspace}] Starting optimized dual-client schema migration for {self.namespace}" + f"[{self.workspace}] Starting iterator-based schema migration for {self.namespace}" ) - if self._client is not None: - self._client.close() - - # Step 1: use self._client to create a temporary collection + # Step 1: Create temporary collection with new schema logger.info( f"[{self.workspace}] Step 1: Creating temporary collection: {temp_collection_name}" ) - # Create indexes for the new collection using self._client # Temporarily update final_namespace for index creation self.final_namespace = temp_collection_name - - # Create new client instance for self._client - self._client = self._create_migration_client() new_schema = self._create_schema_for_namespace() self._client.create_collection( collection_name=temp_collection_name, schema=new_schema @@ -644,75 +608,94 @@ class MilvusVectorDBStorage(BaseVectorStorage): # Load the new collection self._client.load_collection(temp_collection_name) - # Step 2: copy old data to new collection + # Step 2: Copy data using query_iterator (solves query window limitation) logger.info( - f"[{self.workspace}] Step 2: Copying old data to new collection: {original_collection_name}" + f"[{self.workspace}] Step 2: Copying data using query_iterator from: {original_collection_name}" ) - read_client = self._create_migration_client() - read_client.load_collection(original_collection_name) - page_size = 1000 - offset = 0 - total_migrated = 0 - while True: - # Read data from old collection using read_client - page_data = read_client.query( + # Create query iterator + try: + iterator = self._client.query_iterator( collection_name=original_collection_name, - filter="", # Empty filter to get all data + batch_size=2000, # Adjustable batch size for optimal performance output_fields=["*"], # Get all fields - limit=page_size, - offset=offset, ) + logger.debug(f"[{self.workspace}] Query iterator created successfully") + except Exception as iterator_error: + logger.error( + f"[{self.workspace}] Failed to create query iterator: {iterator_error}" + ) + raise - if not page_data: - # No more data to retrieve - break + # Iterate through all data + total_migrated = 0 + batch_number = 1 - # Write data to new collection using self._client + while True: try: - self._client.insert( - collection_name=temp_collection_name, data=page_data - ) - total_migrated += len(page_data) + batch_data = iterator.next() + if not batch_data: + # No more data available + break - logger.debug( - f"[{self.workspace}] Migrated batch {offset//page_size + 1}, " - f"processed {len(page_data)} records, total migrated: {total_migrated}" - ) - except Exception as batch_error: + # Insert batch data to new collection + try: + self._client.insert( + collection_name=temp_collection_name, data=batch_data + ) + total_migrated += len(batch_data) + + logger.info( + f"[{self.workspace}] Iterator batch {batch_number}: " + f"processed {len(batch_data)} records, total migrated: {total_migrated}" + ) + batch_number += 1 + + except Exception as batch_error: + logger.error( + f"[{self.workspace}] Failed to insert iterator batch {batch_number}: {batch_error}" + ) + raise + + except Exception as next_error: logger.error( - f"[{self.workspace}] Failed to migrate batch {offset//page_size + 1}: {batch_error}" + f"[{self.workspace}] Iterator next() failed at batch {batch_number}: {next_error}" ) raise - offset += page_size - - # If we got less than page_size records, we've reached the end - if len(page_data) < page_size: - break - if total_migrated > 0: logger.info( - f"[{self.workspace}] Successfully migrated {total_migrated} records" + f"[{self.workspace}] Successfully migrated {total_migrated} records using iterator" ) else: logger.info( f"[{self.workspace}] No data found in original collection, migration completed" ) - # Step 3: drop old collection - logger.info(f"[{self.workspace}] Step 3: Dropping old collection") - read_client.drop_collection(original_collection_name) - read_client.close() - read_client = None + # Step 3: Rename origin collection (keep for safety) + logger.info( + f"[{self.workspace}] Step 3: Rename origin collection to {original_collection_name}_old" + ) + try: + self._client.rename_collection( + original_collection_name, f"{original_collection_name}_old" + ) + except Exception as rename_error: + try: + logger.warning( + f"[{self.workspace}] Try to drop origin collection instead" + ) + self._client.drop_collection(original_collection_name) + except Exception as e: + logger.error( + f"[{self.workspace}] Rename operation failed: {rename_error}" + ) + raise e # Step 4: Rename temporary collection to original name logger.info( f"[{self.workspace}] Step 4: Renaming collection {temp_collection_name} -> {original_collection_name}" ) - # create new client instance for self._client - self._client = self._create_migration_client() - try: self._client.rename_collection( temp_collection_name, original_collection_name @@ -726,12 +709,12 @@ class MilvusVectorDBStorage(BaseVectorStorage): f"Failed to rename collection: {rename_error}" ) from rename_error - # restore final_namespace + # Restore final_namespace self.final_namespace = original_collection_name except Exception as e: logger.error( - f"[{self.workspace}] Data migration failed for {self.namespace}: {e}" + f"[{self.workspace}] Iterator-based migration failed for {self.namespace}: {e}" ) # Attempt cleanup of temporary collection if it exists @@ -748,18 +731,20 @@ class MilvusVectorDBStorage(BaseVectorStorage): # Re-raise the original error raise RuntimeError( - f"Optimized dual-client migration failed for collection {self.namespace}: {e}" + f"Iterator-based migration failed for collection {self.namespace}: {e}" ) from e finally: - # Ensure read_client is properly closed - if read_client: + # Ensure iterator is properly closed + if iterator: try: - read_client.close() - logger.debug(f"[{self.workspace}] Read client closed successfully") + iterator.close() + logger.debug( + f"[{self.workspace}] Query iterator closed successfully" + ) except Exception as close_error: logger.warning( - f"[{self.workspace}] Failed to close read client: {close_error}" + f"[{self.workspace}] Failed to close query iterator: {close_error}" ) def _validate_collection_compatibility(self):