diff --git a/lightrag/operate.py b/lightrag/operate.py index cddf0ddc..91252e00 100644 --- a/lightrag/operate.py +++ b/lightrag/operate.py @@ -31,6 +31,7 @@ from .utils import ( pick_by_vector_similarity, process_chunks_unified, build_file_path, + sanitize_text_for_encoding, ) from .base import ( BaseGraphStorage, @@ -318,50 +319,62 @@ async def _handle_single_entity_extraction( if len(record_attributes) < 4 or '"entity"' not in record_attributes[0]: return None - # Clean and validate entity name - entity_name = clean_str(record_attributes[1]).strip() - if not entity_name: - logger.warning( - f"Entity extraction error: empty entity name in: {record_attributes}" + try: + # Step 1: Strict UTF-8 encoding sanitization (fail-fast approach) + entity_name = sanitize_text_for_encoding(record_attributes[1]) + + # Step 2: HTML and control character cleaning + entity_name = clean_str(entity_name).strip() + + # Step 3: Business logic normalization + entity_name = normalize_extracted_info(entity_name, is_entity=True) + + # Validate entity name after all cleaning steps + if not entity_name or not entity_name.strip(): + logger.warning( + f"Entity extraction error: entity name became empty after cleaning. Original: '{record_attributes[1]}'" + ) + return None + + # Process entity type with same cleaning pipeline + entity_type = sanitize_text_for_encoding(record_attributes[2]) + entity_type = clean_str(entity_type).strip('"') + if not entity_type.strip() or entity_type.startswith('("'): + logger.warning( + f"Entity extraction error: invalid entity type in: {record_attributes}" + ) + return None + + # Process entity description with same cleaning pipeline + entity_description = sanitize_text_for_encoding(record_attributes[3]) + entity_description = clean_str(entity_description) + entity_description = normalize_extracted_info(entity_description) + + if not entity_description.strip(): + logger.warning( + f"Entity extraction error: empty description for entity '{entity_name}' of type '{entity_type}'" + ) + return None + + return dict( + entity_name=entity_name, + entity_type=entity_type, + description=entity_description, + source_id=chunk_key, + file_path=file_path, + ) + + except ValueError as e: + logger.error( + f"Entity extraction failed due to encoding issues in chunk {chunk_key}: {e}" ) return None - - # Normalize entity name - entity_name = normalize_extracted_info(entity_name, is_entity=True) - - # Check if entity name became empty after normalization - if not entity_name or not entity_name.strip(): - logger.warning( - f"Entity extraction error: entity name became empty after normalization. Original: '{record_attributes[1]}'" + except Exception as e: + logger.error( + f"Entity extraction failed with unexpected error in chunk {chunk_key}: {e}" ) return None - # Clean and validate entity type - entity_type = clean_str(record_attributes[2]).strip('"') - if not entity_type.strip() or entity_type.startswith('("'): - logger.warning( - f"Entity extraction error: invalid entity type in: {record_attributes}" - ) - return None - - # Clean and validate description - entity_description = clean_str(record_attributes[3]) - entity_description = normalize_extracted_info(entity_description) - - if not entity_description.strip(): - logger.warning( - f"Entity extraction error: empty description for entity '{entity_name}' of type '{entity_type}'" - ) - return None - - return dict( - entity_name=entity_name, - entity_type=entity_type, - description=entity_description, - source_id=chunk_key, - file_path=file_path, - ) - async def _handle_single_relationship_extraction( record_attributes: list[str], @@ -370,57 +383,79 @@ async def _handle_single_relationship_extraction( ): if len(record_attributes) < 5 or '"relationship"' not in record_attributes[0]: return None - # add this record as edge - source = clean_str(record_attributes[1]) - target = clean_str(record_attributes[2]) - # Normalize source and target entity names - source = normalize_extracted_info(source, is_entity=True) - target = normalize_extracted_info(target, is_entity=True) + try: + # Process source and target entities with strict cleaning pipeline + # Step 1: Strict UTF-8 encoding sanitization (fail-fast approach) + source = sanitize_text_for_encoding(record_attributes[1]) + # Step 2: HTML and control character cleaning + source = clean_str(source) + # Step 3: Business logic normalization + source = normalize_extracted_info(source, is_entity=True) - # Check if source or target became empty after normalization - if not source or not source.strip(): - logger.warning( - f"Relationship extraction error: source entity became empty after normalization. Original: '{record_attributes[1]}'" + # Same pipeline for target entity + target = sanitize_text_for_encoding(record_attributes[2]) + target = clean_str(target) + target = normalize_extracted_info(target, is_entity=True) + + # Validate entity names after all cleaning steps + if not source or not source.strip(): + logger.warning( + f"Relationship extraction error: source entity became empty after cleaning. Original: '{record_attributes[1]}'" + ) + return None + + if not target or not target.strip(): + logger.warning( + f"Relationship extraction error: target entity became empty after cleaning. Original: '{record_attributes[2]}'" + ) + return None + + if source == target: + logger.debug( + f"Relationship source and target are the same in: {record_attributes}" + ) + return None + + # Process relationship description with same cleaning pipeline + edge_description = sanitize_text_for_encoding(record_attributes[3]) + edge_description = clean_str(edge_description) + edge_description = normalize_extracted_info(edge_description) + + # Process keywords with same cleaning pipeline + edge_keywords = sanitize_text_for_encoding(record_attributes[4]) + edge_keywords = clean_str(edge_keywords) + edge_keywords = normalize_extracted_info(edge_keywords, is_entity=True) + edge_keywords = edge_keywords.replace(",", ",") + + edge_source_id = chunk_key + weight = ( + float(record_attributes[-1].strip('"').strip("'")) + if is_float_regex(record_attributes[-1].strip('"').strip("'")) + else 1.0 + ) + + return dict( + src_id=source, + tgt_id=target, + weight=weight, + description=edge_description, + keywords=edge_keywords, + source_id=edge_source_id, + file_path=file_path, + ) + + except ValueError as e: + logger.error( + f"Relationship extraction failed due to encoding issues in chunk {chunk_key}: {e}" ) return None - - if not target or not target.strip(): - logger.warning( - f"Relationship extraction error: target entity became empty after normalization. Original: '{record_attributes[2]}'" + except Exception as e: + logger.error( + f"Relationship extraction failed with unexpected error in chunk {chunk_key}: {e}" ) return None - if source == target: - logger.debug( - f"Relationship source and target are the same in: {record_attributes}" - ) - return None - - edge_description = clean_str(record_attributes[3]) - edge_description = normalize_extracted_info(edge_description) - - edge_keywords = normalize_extracted_info( - clean_str(record_attributes[4]), is_entity=True - ) - edge_keywords = edge_keywords.replace(",", ",") - - edge_source_id = chunk_key - weight = ( - float(record_attributes[-1].strip('"').strip("'")) - if is_float_regex(record_attributes[-1].strip('"').strip("'")) - else 1.0 - ) - return dict( - src_id=source, - tgt_id=target, - weight=weight, - description=edge_description, - keywords=edge_keywords, - source_id=edge_source_id, - file_path=file_path, - ) - async def _rebuild_knowledge_from_chunks( entities_to_rebuild: dict[str, set[str]], diff --git a/lightrag/utils.py b/lightrag/utils.py index 2d3d485b..cb03c537 100644 --- a/lightrag/utils.py +++ b/lightrag/utils.py @@ -1577,7 +1577,7 @@ def sanitize_text_for_encoding(text: str, replacement_char: str = "") -> str: """Sanitize text to ensure safe UTF-8 encoding by removing or replacing problematic characters. This function handles: - - Surrogate characters (the main cause of the encoding error) + - Surrogate characters (the main cause of encoding errors) - Other invalid Unicode sequences - Control characters that might cause issues - Whitespace trimming @@ -1588,6 +1588,9 @@ def sanitize_text_for_encoding(text: str, replacement_char: str = "") -> str: Returns: Sanitized text that can be safely encoded as UTF-8 + + Raises: + ValueError: When text contains uncleanable encoding issues that cannot be safely processed """ if not isinstance(text, str): return str(text) @@ -1624,7 +1627,7 @@ def sanitize_text_for_encoding(text: str, replacement_char: str = "") -> str: else: sanitized += char - # Additional cleanup: remove null bytes and other control characters that might cause issues + # Additional cleanup: remove null bytes and other control characters that might cause issues # (but preserve common whitespace like \t, \n, \r) sanitized = re.sub( r"[\x00-\x08\x0B\x0C\x0E-\x1F\x7F]", replacement_char, sanitized @@ -1636,34 +1639,21 @@ def sanitize_text_for_encoding(text: str, replacement_char: str = "") -> str: return sanitized except UnicodeEncodeError as e: - logger.warning( - f"Text sanitization: UnicodeEncodeError encountered, applying aggressive cleaning: {str(e)[:100]}" - ) - - # Aggressive fallback: encode with error handling - try: - # Use 'replace' error handling to substitute problematic characters - safe_bytes = text.encode("utf-8", errors="replace") - sanitized = safe_bytes.decode("utf-8") - - # Additional cleanup - sanitized = re.sub( - r"[\x00-\x08\x0B\x0C\x0E-\x1F\x7F]", replacement_char, sanitized - ) - - return sanitized - - except Exception as fallback_error: - logger.error( - f"Text sanitization: Aggressive fallback failed: {str(fallback_error)}" - ) - # Last resort: return a safe placeholder - return f"[TEXT_ENCODING_ERROR: {len(text)} characters]" + # Critical change: Don't return placeholder, raise exception for caller to handle + error_msg = f"Text contains uncleanable UTF-8 encoding issues: {str(e)[:100]}" + logger.error(f"Text sanitization failed: {error_msg}") + raise ValueError(error_msg) from e except Exception as e: logger.error(f"Text sanitization: Unexpected error: {str(e)}") - # Return original text if no encoding issues detected - return text + # For other exceptions, if no encoding issues detected, return original text + try: + text.encode("utf-8") + return text + except UnicodeEncodeError: + raise ValueError( + f"Text sanitization failed with unexpected error: {str(e)}" + ) from e def check_storage_env_vars(storage_name: str) -> None: