feat: add RecursiveSplitter component for Document preprocessing (#8605)

* initial import

* adding initial version + tests

* adding more tests

* more tests

* incorporating SentenceSplitter based on NLTK

* adding more tests

* adding release notes

* adding LICENSE header

* removing unused imports

* fixing example docstring

* addding docstrings

* fixing tests and returning a dictionary

* updating release notes

* attending PR comments

* Update haystack/components/preprocessors/recursive_splitter.py

Co-authored-by: Sebastian Husch Lee <sjrl@users.noreply.github.com>

* wip: updating tests for split_idx_start and _split_overlap

* adding tests for split_idx and split_start and overlaps

* adjusting file for LICENSE checking

* adding more tests

* adding tests for page numbering

* adding tests for min split lenghts and falling back to character-level chunking based on size

* fixing linting issue

* Update haystack/components/preprocessors/recursive_splitter.py

Co-authored-by: Sebastian Husch Lee <sjrl@users.noreply.github.com>

* Update haystack/components/preprocessors/recursive_splitter.py

Co-authored-by: Sebastian Husch Lee <sjrl@users.noreply.github.com>

* Update haystack/components/preprocessors/recursive_splitter.py

Co-authored-by: Sebastian Husch Lee <sjrl@users.noreply.github.com>

* Update haystack/components/preprocessors/recursive_splitter.py

Co-authored-by: Sebastian Husch Lee <sjrl@users.noreply.github.com>

* Update haystack/components/preprocessors/recursive_splitter.py

Co-authored-by: Sebastian Husch Lee <sjrl@users.noreply.github.com>

* Update haystack/components/preprocessors/recursive_splitter.py

Co-authored-by: Sebastian Husch Lee <sjrl@users.noreply.github.com>

* Update haystack/components/preprocessors/recursive_splitter.py

Co-authored-by: Sebastian Husch Lee <sjrl@users.noreply.github.com>

* Update haystack/components/preprocessors/recursive_splitter.py

Co-authored-by: Sebastian Husch Lee <sjrl@users.noreply.github.com>

* wip

* wip

* updating tests

* wip: fixing all tests after changes

* more tests

* wip: debugging sentence overlap

* wip: debugging page number

* wip

* wip; fixed bug with sentence tokenizer, needs to keep white spaces

* adding tests for counting pages on different split approaches

* NLTK checks done on SentenceSplitter

* fixing types

* adding detecting for full overlap with previous chunks

* fixing types

* improving docstring

* improving docstring

* adding custom lenght, 'character' use case

* customising overlap function for word and adding a few tests

* updating docstring

* Update haystack/components/preprocessors/recursive_splitter.py

Co-authored-by: Sebastian Husch Lee <sjrl@users.noreply.github.com>

* Update haystack/components/preprocessors/recursive_splitter.py

Co-authored-by: Sebastian Husch Lee <sjrl@users.noreply.github.com>

* Update haystack/components/preprocessors/recursive_splitter.py

Co-authored-by: Sebastian Husch Lee <sjrl@users.noreply.github.com>

* wip: adding more tests for word unit length

* fix

* feat: `Tool` dataclass - unified abstraction to represent tools (#8652)

* draft

* del HF token in tests

* adaptations

* progress

* fix type

* import sorting

* more control on deserialization

* release note

* improvements

* support name field

* fix chatpromptbuilder test

* port Tool from experimental

* release note

* docs upd

* Update tool.py

---------

Co-authored-by: Daria Fokina <daria.fokina@deepset.ai>

* fix: fix deserialization issues in multi-threading environments (#8651)

* adding 'word' as default length

* fixing types

* handing both default strategies

* wip

* \f was not being counted properly

* updating tests

* fixing the overlap bug

* adding more tests

* refactoring _apply_overlap

* further refactoring

* Update haystack/components/preprocessors/recursive_splitter.py

Co-authored-by: Sebastian Husch Lee <sjrl@users.noreply.github.com>

* Update haystack/components/preprocessors/recursive_splitter.py

Co-authored-by: Sebastian Husch Lee <sjrl@users.noreply.github.com>

* Update haystack/components/preprocessors/recursive_splitter.py

Co-authored-by: Sebastian Husch Lee <sjrl@users.noreply.github.com>

* Update haystack/components/preprocessors/recursive_splitter.py

Co-authored-by: Sebastian Husch Lee <sjrl@users.noreply.github.com>

* adding ticks to close code block

* fixing comments

* applying changes: split with space and force keep_white_spaces=True

* fixing some tests and replacing count words approach in more places

* keep_white_spaces = True only if not defined

* cleaning docs

* handling some more edge cases, when split is still too big and all separators ran

* fixing fallback whitespaces count to fixed word/char split based on split size

* cleaning

---------

Co-authored-by: Sebastian Husch Lee <sjrl@users.noreply.github.com>
Co-authored-by: Stefano Fiorucci <stefanofiorucci@gmail.com>
Co-authored-by: Daria Fokina <daria.fokina@deepset.ai>
Co-authored-by: Tobias Wochinger <tobias.wochinger@deepset.ai>
This commit is contained in:
David S. Batista 2025-01-10 17:28:53 +01:00 committed by GitHub
parent 741ce5df50
commit 4f73b192f8
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
5 changed files with 1246 additions and 2 deletions

View File

@ -5,7 +5,7 @@
from .document_cleaner import DocumentCleaner
from .document_splitter import DocumentSplitter
from .nltk_document_splitter import NLTKDocumentSplitter
from .sentence_tokenizer import SentenceSplitter
from .recursive_splitter import RecursiveDocumentSplitter
from .text_cleaner import TextCleaner
__all__ = ["DocumentSplitter", "DocumentCleaner", "NLTKDocumentSplitter", "SentenceSplitter", "TextCleaner"]
__all__ = ["DocumentSplitter", "DocumentCleaner", "RecursiveDocumentSplitter", "TextCleaner", "NLTKDocumentSplitter"]

View File

@ -0,0 +1,421 @@
# SPDX-FileCopyrightText: 2022-present deepset GmbH <info@deepset.ai>
#
# SPDX-License-Identifier: Apache-2.0
import re
from copy import deepcopy
from typing import Any, Dict, List, Literal, Optional, Tuple
from haystack import Document, component, logging
logger = logging.getLogger(__name__)
@component
class RecursiveDocumentSplitter:
"""
Recursively chunk text into smaller chunks.
This component is used to split text into smaller chunks, it does so by recursively applying a list of separators
to the text.
The separators are applied in the order they are provided, typically this is a list of separators that are
applied in a specific order, being the last separator the most specific one.
Each separator is applied to the text, it then checks each of the resulting chunks, it keeps the chunks that
are within the chunk_size, for the ones that are larger than the chunk_size, it applies the next separator in the
list to the remaining text.
This is done until all chunks are smaller than the chunk_size parameter.
Example:
```python
from haystack import Document
from haystack.components.preprocessors import RecursiveDocumentSplitter
chunker = RecursiveDocumentSplitter(split_length=260, split_overlap=0, separators=["\n\n", "\n", ".", " "])
text = '''Artificial intelligence (AI) - Introduction
AI, in its broadest sense, is intelligence exhibited by machines, particularly computer systems.
AI technology is widely used throughout industry, government, and science. Some high-profile applications include advanced web search engines; recommendation systems; interacting via human speech; autonomous vehicles; generative and creative tools; and superhuman play and analysis in strategy games.'''
chunker.warm_up()
doc = Document(content=text)
doc_chunks = chunker.run([doc])
print(doc_chunks["documents"])
>[
>Document(id=..., content: 'Artificial intelligence (AI) - Introduction\n\n', meta: {'original_id': '65167a9823dd883de577e828ca4fd529e6f7241f0ff616acfce454d808478951', 'split_id': 0, 'split_idx_start': 0, '_split_overlap': []})
>Document(id=..., content: 'AI, in its broadest sense, is intelligence exhibited by machines, particularly computer systems.\n', meta: {'original_id': '65167a9823dd883de577e828ca4fd529e6f7241f0ff616acfce454d808478951', 'split_id': 1, 'split_idx_start': 45, '_split_overlap': []})
>Document(id=..., content: 'AI technology is widely used throughout industry, government, and science.', meta: {'original_id': '65167a9823dd883de577e828ca4fd529e6f7241f0ff616acfce454d808478951', 'split_id': 2, 'split_idx_start': 142, '_split_overlap': []})
>Document(id=..., content: ' Some high-profile applications include advanced web search engines; recommendation systems; interac...', meta: {'original_id': '65167a9823dd883de577e828ca4fd529e6f7241f0ff616acfce454d808478951', 'split_id': 3, 'split_idx_start': 216, '_split_overlap': []})
>]
```
""" # noqa: E501
def __init__(
self,
*,
split_length: int = 200,
split_overlap: int = 0,
split_unit: Literal["word", "char"] = "word",
separators: Optional[List[str]] = None,
sentence_splitter_params: Optional[Dict[str, Any]] = None,
):
"""
Initializes a RecursiveDocumentSplitter.
:param split_length: The maximum length of each chunk by default in words, but can be in characters.
See the `split_units` parameter.
:param split_overlap: The number of characters to overlap between consecutive chunks.
:param split_unit: The unit of the split_length parameter. It can be either "word" or "char".
:param separators: An optional list of separator strings to use for splitting the text. The string
separators will be treated as regular expressions unless the separator is "sentence", in that case the
text will be split into sentences using a custom sentence tokenizer based on NLTK.
See: haystack.components.preprocessors.sentence_tokenizer.SentenceSplitter.
If no separators are provided, the default separators ["\n\n", "sentence", "\n", " "] are used.
:param sentence_splitter_params: Optional parameters to pass to the sentence tokenizer.
See: haystack.components.preprocessors.sentence_tokenizer.SentenceSplitter for more information.
:raises ValueError: If the overlap is greater than or equal to the chunk size or if the overlap is negative, or
if any separator is not a string.
"""
self.split_length = split_length
self.split_overlap = split_overlap
self.split_units = split_unit
self.separators = separators if separators else ["\n\n", "sentence", "\n", " "] # default separators
self._check_params()
self.nltk_tokenizer = None
self.sentence_splitter_params = (
{"keep_white_spaces": True} if sentence_splitter_params is None else sentence_splitter_params
)
def warm_up(self) -> None:
"""
Warm up the sentence tokenizer.
"""
self.nltk_tokenizer = self._get_custom_sentence_tokenizer(self.sentence_splitter_params)
def _check_params(self) -> None:
if self.split_length < 1:
raise ValueError("Split length must be at least 1 character.")
if self.split_overlap < 0:
raise ValueError("Overlap must be greater than zero.")
if self.split_overlap >= self.split_length:
raise ValueError("Overlap cannot be greater than or equal to the chunk size.")
if not all(isinstance(separator, str) for separator in self.separators):
raise ValueError("All separators must be strings.")
@staticmethod
def _get_custom_sentence_tokenizer(sentence_splitter_params: Dict[str, Any]):
from haystack.components.preprocessors.sentence_tokenizer import SentenceSplitter
return SentenceSplitter(**sentence_splitter_params)
def _split_chunk(self, current_chunk: str) -> Tuple[str, str]:
"""
Splits a chunk based on the split_length and split_units attribute.
:param current_chunk: The current chunk to be split.
:returns:
A tuple containing the current chunk and the remaining words or characters.
"""
if self.split_units == "word":
words = current_chunk.split()
current_chunk = " ".join(words[: self.split_length])
remaining_words = words[self.split_length :]
return current_chunk, " ".join(remaining_words)
# split by characters
text = current_chunk
current_chunk = text[: self.split_length]
remaining_chars = text[self.split_length :]
return current_chunk, remaining_chars
def _apply_overlap(self, chunks: List[str]) -> List[str]:
"""
Applies an overlap between consecutive chunks if the chunk_overlap attribute is greater than zero.
Works for both word- and character-level splitting. It trims the last chunk if it exceeds the split_length and
adds the trimmed content to the next chunk. If the last chunk is still too long after trimming, it splits it
and adds the first chunk to the list. This process continues until the last chunk is within the split_length.
:param chunks: A list of text chunks.
:returns:
A list of text chunks with the overlap applied.
"""
overlapped_chunks: List[str] = []
for idx, chunk in enumerate(chunks):
if idx == 0:
overlapped_chunks.append(chunk)
continue
# get the overlap between the current and previous chunk
overlap, prev_chunk = self._get_overlap(overlapped_chunks)
if overlap == prev_chunk:
logger.warning(
"Overlap is the same as the previous chunk. "
"Consider increasing the `split_length` parameter or decreasing the `split_overlap` parameter."
)
# create a new chunk starting with the overlap
current_chunk = overlap + " " + chunk if self.split_units == "word" else overlap + chunk
# if this new chunk exceeds 'split_length', trim it and move the remaining text to the next chunk
# if this is the last chunk, another new chunk will contain the trimmed text preceded by the overlap
# of the last chunk
if self._chunk_length(current_chunk) > self.split_length:
current_chunk, remaining_text = self._split_chunk(current_chunk)
if idx < len(chunks) - 1:
chunks[idx + 1] = remaining_text + (" " if self.split_units == "word" else "") + chunks[idx + 1]
elif remaining_text:
# create a new chunk with the trimmed text preceded by the overlap of the last chunk
overlapped_chunks.append(current_chunk)
chunk = remaining_text
overlap, _ = self._get_overlap(overlapped_chunks)
current_chunk = overlap + " " + chunk if self.split_units == "word" else overlap + chunk
overlapped_chunks.append(current_chunk)
# it can still be that the new last chunk exceeds the 'split_length'
# continue splitting until the last chunk is within 'split_length'
if idx == len(chunks) - 1 and self._chunk_length(current_chunk) > self.split_length:
last_chunk = overlapped_chunks.pop()
first_chunk, remaining_chunk = self._split_chunk(last_chunk)
overlapped_chunks.append(first_chunk)
while remaining_chunk:
# combine overlap with remaining chunk
overlap, _ = self._get_overlap(overlapped_chunks)
current = overlap + (" " if self.split_units == "word" else "") + remaining_chunk
# if it fits within split_length we are done
if self._chunk_length(current) <= self.split_length:
overlapped_chunks.append(current)
break
# otherwise split it again
first_chunk, remaining_chunk = self._split_chunk(current)
overlapped_chunks.append(first_chunk)
return overlapped_chunks
def _get_overlap(self, overlapped_chunks: List[str]) -> Tuple[str, str]:
"""Get the previous overlapped chunk instead of the original chunk."""
prev_chunk = overlapped_chunks[-1]
overlap_start = max(0, self._chunk_length(prev_chunk) - self.split_overlap)
if self.split_units == "word":
word_chunks = prev_chunk.split()
overlap = " ".join(word_chunks[overlap_start:])
else:
overlap = prev_chunk[overlap_start:]
return overlap, prev_chunk
def _chunk_length(self, text: str) -> int:
"""
Split the text by whitespace and count non-empty elements.
:param: The text to be split.
:return: The number of words in the text.
"""
if self.split_units == "word":
words = [word for word in text.split(" ") if word]
return len(words)
return len(text)
def _chunk_text(self, text: str) -> List[str]:
"""
Recursive chunking algorithm that divides text into smaller chunks based on a list of separator characters.
It starts with a list of separator characters (e.g., ["\n\n", "sentence", "\n", " "]) and attempts to divide
the text using the first separator. If the resulting chunks are still larger than the specified chunk size,
it moves to the next separator in the list. This process continues recursively, progressively applying each
specific separator until the chunks meet the desired size criteria.
:param text: The text to be split into chunks.
:returns:
A list of text chunks.
"""
if self._chunk_length(text) <= self.split_length:
return [text]
for curr_separator in self.separators: # type: ignore # the caller already checked that separators is not None
if curr_separator == "sentence":
# re. ignore: correct SentenceSplitter initialization is checked at the initialization of the component
sentence_with_spans = self.nltk_tokenizer.split_sentences(text) # type: ignore
splits = [sentence["sentence"] for sentence in sentence_with_spans]
else:
# add escape "\" to the separator and wrapped it in a group so that it's included in the splits as well
escaped_separator = re.escape(curr_separator)
escaped_separator = f"({escaped_separator})"
# split the text and merge every two consecutive splits, i.e.: the text and the separator after it
splits = re.split(escaped_separator, text)
splits = [
"".join([splits[i], splits[i + 1]]) if i < len(splits) - 1 else splits[i]
for i in range(0, len(splits), 2)
]
# remove last split if it's empty
splits = splits[:-1] if splits[-1] == "" else splits
if len(splits) == 1: # go to next separator, if current separator not found in the text
continue
chunks = []
current_chunk: List[str] = []
current_length = 0
# check splits, if any is too long, recursively chunk it, otherwise add to current chunk
for split in splits:
split_text = split
# if adding this split exceeds chunk_size, process current_chunk
if current_length + self._chunk_length(split_text) > self.split_length:
# process current_chunk
if current_chunk: # keep the good splits
chunks.append("".join(current_chunk))
current_chunk = []
current_length = 0
# recursively handle splits that are too large
if self._chunk_length(split_text) > self.split_length:
if curr_separator == self.separators[-1]:
# tried last separator, can't split further, do a fixed-split based on word/character
fall_back_chunks = self._fall_back_to_fixed_chunking(split_text, self.split_units)
chunks.extend(fall_back_chunks)
else:
chunks.extend(self._chunk_text(split_text))
current_length += self._chunk_length(split_text)
else:
current_chunk.append(split_text)
current_length += self._chunk_length(split_text)
else:
current_chunk.append(split_text)
current_length += self._chunk_length(split_text)
if current_chunk:
chunks.append("".join(current_chunk))
if self.split_overlap > 0:
chunks = self._apply_overlap(chunks)
if chunks:
return chunks
# if no separator worked, fall back to word- or character-level chunking
return self._fall_back_to_fixed_chunking(text, self.split_units)
def _fall_back_to_fixed_chunking(self, text: str, split_units: Literal["word", "char"]) -> List[str]:
"""
Fall back to a fixed chunking approach if no separator works for the text.
Splits the text into smaller chunks based on the split_length and split_units attributes, either by words or
characters. It splits into words using whitespace as a separator.
:param text: The text to be split into chunks.
:param split_units: The unit of the split_length parameter. It can be either "word" or "char".
:returns:
A list of text chunks.
"""
chunks = []
step = self.split_length - self.split_overlap
if split_units == "word":
words = re.findall(r"\S+|\s+", text)
current_chunk = []
current_length = 0
for word in words:
if word != " ":
current_chunk.append(word)
current_length += 1
if current_length == step and current_chunk:
chunks.append("".join(current_chunk))
current_chunk = []
current_length = 0
else:
current_chunk.append(word)
if current_chunk:
chunks.append("".join(current_chunk))
else:
for i in range(0, self._chunk_length(text), step):
chunks.append(text[i : i + self.split_length])
return chunks
def _add_overlap_info(self, curr_pos: int, new_doc: Document, new_docs: List[Document]) -> None:
prev_doc = new_docs[-1]
overlap_length = self._chunk_length(prev_doc.content) - (curr_pos - prev_doc.meta["split_idx_start"]) # type: ignore
if overlap_length > 0:
prev_doc.meta["_split_overlap"].append({"doc_id": new_doc.id, "range": (0, overlap_length)})
new_doc.meta["_split_overlap"].append(
{
"doc_id": prev_doc.id,
"range": (
self._chunk_length(prev_doc.content) - overlap_length, # type: ignore
self._chunk_length(prev_doc.content), # type: ignore
),
}
)
def _run_one(self, doc: Document) -> List[Document]:
chunks = self._chunk_text(doc.content) # type: ignore # the caller already check for a non-empty doc.content
chunks = chunks[:-1] if len(chunks[-1]) == 0 else chunks # remove last empty chunk if it exists
current_position = 0
current_page = 1
new_docs: List[Document] = []
for split_nr, chunk in enumerate(chunks):
new_doc = Document(content=chunk, meta=deepcopy(doc.meta))
new_doc.meta["split_id"] = split_nr
new_doc.meta["split_idx_start"] = current_position
new_doc.meta["_split_overlap"] = [] if self.split_overlap > 0 else None
# add overlap information to the previous and current doc
if split_nr > 0 and self.split_overlap > 0:
self._add_overlap_info(current_position, new_doc, new_docs)
# count page breaks in the chunk
current_page += chunk.count("\f")
# if there are consecutive page breaks at the end with no more text, adjust the page number
# e.g: "text\f\f\f" -> 3 page breaks, but current_page should be 1
consecutive_page_breaks = len(chunk) - len(chunk.rstrip("\f"))
if consecutive_page_breaks > 0:
new_doc.meta["page_number"] = current_page - consecutive_page_breaks
else:
new_doc.meta["page_number"] = current_page
# keep the new chunk doc and update the current position
new_docs.append(new_doc)
current_position += len(chunk) - (self.split_overlap if split_nr < len(chunks) - 1 else 0)
return new_docs
@component.output_types(documents=List[Document])
def run(self, documents: List[Document]) -> Dict[str, List[Document]]:
"""
Split a list of documents into documents with smaller chunks of text.
:param documents: List of Documents to split.
:returns:
A dictionary containing a key "documents" with a List of Documents with smaller chunks of text corresponding
to the input documents.
"""
docs = []
for doc in documents:
if not doc.content or doc.content == "":
logger.warning("Document ID {doc_id} has an empty content. Skipping this document.", doc_id=doc.id)
continue
docs.extend(self._run_one(doc))
return {"documents": docs}

View File

@ -135,6 +135,7 @@ class SentenceSplitter: # pylint: disable=too-few-public-methods
Currently supported languages are: en, de.
:param keep_white_spaces: If True, the tokenizer will keep white spaces between sentences.
"""
nltk_imports.check()
self.language = language
self.sentence_tokenizer = load_sentence_tokenizer(language, keep_white_spaces=keep_white_spaces)
self.use_split_rules = use_split_rules

View File

@ -0,0 +1,4 @@
---
features:
- |
Adding a `RecursiveChunker,` which uses a set of separators to split text recursively. It attempts to divide the text using the first separator, if the resulting chunks are still larger than the specified size, it moves to the next separator in the list.

View File

@ -0,0 +1,818 @@
import re
import pytest
from pytest import LogCaptureFixture
from haystack import Document, Pipeline
from haystack.components.preprocessors.recursive_splitter import RecursiveDocumentSplitter
from haystack.components.preprocessors.sentence_tokenizer import SentenceSplitter
def test_get_custom_sentence_tokenizer_success():
tokenizer = RecursiveDocumentSplitter._get_custom_sentence_tokenizer({})
assert isinstance(tokenizer, SentenceSplitter)
def test_init_with_negative_overlap():
with pytest.raises(ValueError):
_ = RecursiveDocumentSplitter(split_length=20, split_overlap=-1, separators=["."])
def test_init_with_overlap_greater_than_chunk_size():
with pytest.raises(ValueError):
_ = RecursiveDocumentSplitter(split_length=10, split_overlap=15, separators=["."])
def test_init_with_invalid_separators():
with pytest.raises(ValueError):
_ = RecursiveDocumentSplitter(separators=[".", 2])
def test_init_with_negative_split_length():
with pytest.raises(ValueError):
_ = RecursiveDocumentSplitter(split_length=-1, separators=["."])
def test_apply_overlap_no_overlap():
# Test the case where there is no overlap between chunks
splitter = RecursiveDocumentSplitter(split_length=20, split_overlap=0, separators=["."], split_unit="char")
chunks = ["chunk1", "chunk2", "chunk3"]
result = splitter._apply_overlap(chunks)
assert result == ["chunk1", "chunk2", "chunk3"]
def test_apply_overlap_with_overlap():
# Test the case where there is overlap between chunks
splitter = RecursiveDocumentSplitter(split_length=20, split_overlap=4, separators=["."], split_unit="char")
chunks = ["chunk1", "chunk2", "chunk3"]
result = splitter._apply_overlap(chunks)
assert result == ["chunk1", "unk1chunk2", "unk2chunk3"]
def test_apply_overlap_with_overlap_capturing_completely_previous_chunk(caplog):
splitter = RecursiveDocumentSplitter(split_length=20, split_overlap=6, separators=["."], split_unit="char")
chunks = ["chunk1", "chunk2", "chunk3", "chunk4"]
_ = splitter._apply_overlap(chunks)
assert (
"Overlap is the same as the previous chunk. Consider increasing the `split_length` parameter or decreasing the `split_overlap` parameter."
in caplog.text
)
def test_apply_overlap_single_chunk():
# Test the case where there is only one chunk
splitter = RecursiveDocumentSplitter(split_length=20, split_overlap=3, separators=["."], split_unit="char")
chunks = ["chunk1"]
result = splitter._apply_overlap(chunks)
assert result == ["chunk1"]
def test_chunk_text_smaller_than_chunk_size():
splitter = RecursiveDocumentSplitter(split_length=20, split_overlap=0, separators=["."])
text = "small text"
chunks = splitter._chunk_text(text)
assert len(chunks) == 1
assert chunks[0] == text
def test_chunk_text_by_period():
splitter = RecursiveDocumentSplitter(split_length=20, split_overlap=0, separators=["."], split_unit="char")
text = "This is a test. Another sentence. And one more."
chunks = splitter._chunk_text(text)
assert len(chunks) == 3
assert chunks[0] == "This is a test."
assert chunks[1] == " Another sentence."
assert chunks[2] == " And one more."
def test_run_multiple_new_lines_unit_char():
splitter = RecursiveDocumentSplitter(split_length=18, separators=["\n\n", "\n"], split_unit="char")
text = "This is a test.\n\n\nAnother test.\n\n\n\nFinal test."
doc = Document(content=text)
chunks = splitter.run([doc])["documents"]
assert chunks[0].content == "This is a test.\n\n"
assert chunks[1].content == "\nAnother test.\n\n\n\n"
assert chunks[2].content == "Final test."
def test_run_empty_documents(caplog: LogCaptureFixture):
splitter = RecursiveDocumentSplitter(split_length=20, split_overlap=0, separators=["."])
empty_doc = Document(content="")
doc_chunks = splitter.run([empty_doc])
doc_chunks = doc_chunks["documents"]
assert len(doc_chunks) == 0
assert "has an empty content. Skipping this document." in caplog.text
def test_run_using_custom_sentence_tokenizer():
"""
This test includes abbreviations that are not handled by the simple sentence tokenizer based on "." and requires a
more sophisticated sentence tokenizer like the one provided by NLTK.
"""
splitter = RecursiveDocumentSplitter(
split_length=400,
split_overlap=0,
split_unit="char",
separators=["\n\n", "\n", "sentence", " "],
sentence_splitter_params={"language": "en", "use_split_rules": True, "keep_white_spaces": False},
)
splitter.warm_up()
text = """Artificial intelligence (AI) - Introduction
AI, in its broadest sense, is intelligence exhibited by machines, particularly computer systems.
AI technology is widely used throughout industry, government, and science. Some high-profile applications include advanced web search engines (e.g., Google Search); recommendation systems (used by YouTube, Amazon, and Netflix); interacting via human speech (e.g., Google Assistant, Siri, and Alexa); autonomous vehicles (e.g., Waymo); generative and creative tools (e.g., ChatGPT and AI art); and superhuman play and analysis in strategy games (e.g., chess and Go).""" # noqa: E501
chunks = splitter.run([Document(content=text)])
chunks = chunks["documents"]
assert len(chunks) == 4
assert chunks[0].content == "Artificial intelligence (AI) - Introduction\n\n"
assert (
chunks[1].content
== "AI, in its broadest sense, is intelligence exhibited by machines, particularly computer systems.\n"
) # noqa: E501
assert chunks[2].content == "AI technology is widely used throughout industry, government, and science." # noqa: E501
assert (
chunks[3].content
== "Some high-profile applications include advanced web search engines (e.g., Google Search); recommendation systems (used by YouTube, Amazon, and Netflix); interacting via human speech (e.g., Google Assistant, Siri, and Alexa); autonomous vehicles (e.g., Waymo); generative and creative tools (e.g., ChatGPT and AI art); and superhuman play and analysis in strategy games (e.g., chess and Go)."
) # noqa: E501
def test_run_split_by_dot_count_page_breaks_split_unit_char() -> None:
document_splitter = RecursiveDocumentSplitter(separators=["."], split_length=30, split_overlap=0, split_unit="char")
text = (
"Sentence on page 1. Another on page 1.\fSentence on page 2. Another on page 2.\f"
"Sentence on page 3. Another on page 3.\f\f Sentence on page 5."
)
documents = document_splitter.run(documents=[Document(content=text)])["documents"]
assert len(documents) == 7
assert documents[0].content == "Sentence on page 1."
assert documents[0].meta["page_number"] == 1
assert documents[0].meta["split_id"] == 0
assert documents[0].meta["split_idx_start"] == text.index(documents[0].content)
assert documents[1].content == " Another on page 1."
assert documents[1].meta["page_number"] == 1
assert documents[1].meta["split_id"] == 1
assert documents[1].meta["split_idx_start"] == text.index(documents[1].content)
assert documents[2].content == "\fSentence on page 2."
assert documents[2].meta["page_number"] == 2
assert documents[2].meta["split_id"] == 2
assert documents[2].meta["split_idx_start"] == text.index(documents[2].content)
assert documents[3].content == " Another on page 2."
assert documents[3].meta["page_number"] == 2
assert documents[3].meta["split_id"] == 3
assert documents[3].meta["split_idx_start"] == text.index(documents[3].content)
assert documents[4].content == "\fSentence on page 3."
assert documents[4].meta["page_number"] == 3
assert documents[4].meta["split_id"] == 4
assert documents[4].meta["split_idx_start"] == text.index(documents[4].content)
assert documents[5].content == " Another on page 3."
assert documents[5].meta["page_number"] == 3
assert documents[5].meta["split_id"] == 5
assert documents[5].meta["split_idx_start"] == text.index(documents[5].content)
assert documents[6].content == "\f\f Sentence on page 5."
assert documents[6].meta["page_number"] == 5
assert documents[6].meta["split_id"] == 6
assert documents[6].meta["split_idx_start"] == text.index(documents[6].content)
def test_run_split_by_word_count_page_breaks_split_unit_char():
splitter = RecursiveDocumentSplitter(split_length=19, split_overlap=0, separators=[" "], split_unit="char")
text = "This is some text. \f This text is on another page. \f This is the last pag3."
doc = Document(content=text)
doc_chunks = splitter.run([doc])
doc_chunks = doc_chunks["documents"]
assert len(doc_chunks) == 5
assert doc_chunks[0].content == "This is some text. "
assert doc_chunks[0].meta["page_number"] == 1
assert doc_chunks[0].meta["split_id"] == 0
assert doc_chunks[0].meta["split_idx_start"] == text.index(doc_chunks[0].content)
assert doc_chunks[1].content == "\f This text is on "
assert doc_chunks[1].meta["page_number"] == 2
assert doc_chunks[1].meta["split_id"] == 1
assert doc_chunks[1].meta["split_idx_start"] == text.index(doc_chunks[1].content)
assert doc_chunks[2].content == "another page. \f "
assert doc_chunks[2].meta["page_number"] == 3
assert doc_chunks[2].meta["split_id"] == 2
assert doc_chunks[2].meta["split_idx_start"] == text.index(doc_chunks[2].content)
assert doc_chunks[3].content == "This is the last "
assert doc_chunks[3].meta["page_number"] == 3
assert doc_chunks[3].meta["split_id"] == 3
assert doc_chunks[3].meta["split_idx_start"] == text.index(doc_chunks[3].content)
assert doc_chunks[4].content == "pag3."
assert doc_chunks[4].meta["page_number"] == 3
assert doc_chunks[4].meta["split_id"] == 4
assert doc_chunks[4].meta["split_idx_start"] == text.index(doc_chunks[4].content)
def test_run_split_by_page_break_count_page_breaks() -> None:
document_splitter = RecursiveDocumentSplitter(
separators=["\f"], split_length=50, split_overlap=0, split_unit="char"
)
text = (
"Sentence on page 1. Another on page 1.\fSentence on page 2. Another on page 2.\f"
"Sentence on page 3. Another on page 3.\f\f Sentence on page 5."
)
documents = document_splitter.run(documents=[Document(content=text)])
chunks_docs = documents["documents"]
assert len(chunks_docs) == 4
assert chunks_docs[0].content == "Sentence on page 1. Another on page 1.\f"
assert chunks_docs[0].meta["page_number"] == 1
assert chunks_docs[0].meta["split_id"] == 0
assert chunks_docs[0].meta["split_idx_start"] == text.index(chunks_docs[0].content)
assert chunks_docs[1].content == "Sentence on page 2. Another on page 2.\f"
assert chunks_docs[1].meta["page_number"] == 2
assert chunks_docs[1].meta["split_id"] == 1
assert chunks_docs[1].meta["split_idx_start"] == text.index(chunks_docs[1].content)
assert chunks_docs[2].content == "Sentence on page 3. Another on page 3.\f\f"
assert chunks_docs[2].meta["page_number"] == 3
assert chunks_docs[2].meta["split_id"] == 2
assert chunks_docs[2].meta["split_idx_start"] == text.index(chunks_docs[2].content)
assert chunks_docs[3].content == " Sentence on page 5."
assert chunks_docs[3].meta["page_number"] == 5
assert chunks_docs[3].meta["split_id"] == 3
assert chunks_docs[3].meta["split_idx_start"] == text.index(chunks_docs[3].content)
def test_run_split_by_new_line_count_page_breaks_split_unit_char() -> None:
document_splitter = RecursiveDocumentSplitter(
separators=["\n"], split_length=21, split_overlap=0, split_unit="char"
)
text = (
"Sentence on page 1.\nAnother on page 1.\n\f"
"Sentence on page 2.\nAnother on page 2.\n\f"
"Sentence on page 3.\nAnother on page 3.\n\f\f"
"Sentence on page 5."
)
documents = document_splitter.run(documents=[Document(content=text)])
chunks_docs = documents["documents"]
assert len(chunks_docs) == 7
assert chunks_docs[0].content == "Sentence on page 1.\n"
assert chunks_docs[0].meta["page_number"] == 1
assert chunks_docs[0].meta["split_id"] == 0
assert chunks_docs[0].meta["split_idx_start"] == text.index(chunks_docs[0].content)
assert chunks_docs[1].content == "Another on page 1.\n"
assert chunks_docs[1].meta["page_number"] == 1
assert chunks_docs[1].meta["split_id"] == 1
assert chunks_docs[1].meta["split_idx_start"] == text.index(chunks_docs[1].content)
assert chunks_docs[2].content == "\fSentence on page 2.\n"
assert chunks_docs[2].meta["page_number"] == 2
assert chunks_docs[2].meta["split_id"] == 2
assert chunks_docs[2].meta["split_idx_start"] == text.index(chunks_docs[2].content)
assert chunks_docs[3].content == "Another on page 2.\n"
assert chunks_docs[3].meta["page_number"] == 2
assert chunks_docs[3].meta["split_id"] == 3
assert chunks_docs[3].meta["split_idx_start"] == text.index(chunks_docs[3].content)
assert chunks_docs[4].content == "\fSentence on page 3.\n"
assert chunks_docs[4].meta["page_number"] == 3
assert chunks_docs[4].meta["split_id"] == 4
assert chunks_docs[4].meta["split_idx_start"] == text.index(chunks_docs[4].content)
assert chunks_docs[5].content == "Another on page 3.\n"
assert chunks_docs[5].meta["page_number"] == 3
assert chunks_docs[5].meta["split_id"] == 5
assert chunks_docs[5].meta["split_idx_start"] == text.index(chunks_docs[5].content)
assert chunks_docs[6].content == "\f\fSentence on page 5."
assert chunks_docs[6].meta["page_number"] == 5
assert chunks_docs[6].meta["split_id"] == 6
assert chunks_docs[6].meta["split_idx_start"] == text.index(chunks_docs[6].content)
def test_run_split_by_sentence_count_page_breaks_split_unit_char() -> None:
document_splitter = RecursiveDocumentSplitter(
separators=["sentence"], split_length=28, split_overlap=0, split_unit="char"
)
document_splitter.warm_up()
text = (
"Sentence on page 1. Another on page 1.\fSentence on page 2. Another on page 2.\f"
"Sentence on page 3. Another on page 3.\f\fSentence on page 5."
)
documents = document_splitter.run(documents=[Document(content=text)])
chunks_docs = documents["documents"]
assert len(chunks_docs) == 7
assert chunks_docs[0].content == "Sentence on page 1. "
assert chunks_docs[0].meta["page_number"] == 1
assert chunks_docs[0].meta["split_id"] == 0
assert chunks_docs[0].meta["split_idx_start"] == text.index(chunks_docs[0].content)
assert chunks_docs[1].content == "Another on page 1.\f"
assert chunks_docs[1].meta["page_number"] == 1
assert chunks_docs[1].meta["split_id"] == 1
assert chunks_docs[1].meta["split_idx_start"] == text.index(chunks_docs[1].content)
assert chunks_docs[2].content == "Sentence on page 2. "
assert chunks_docs[2].meta["page_number"] == 2
assert chunks_docs[2].meta["split_id"] == 2
assert chunks_docs[2].meta["split_idx_start"] == text.index(chunks_docs[2].content)
assert chunks_docs[3].content == "Another on page 2.\f"
assert chunks_docs[3].meta["page_number"] == 2
assert chunks_docs[3].meta["split_id"] == 3
assert chunks_docs[3].meta["split_idx_start"] == text.index(chunks_docs[3].content)
assert chunks_docs[4].content == "Sentence on page 3. "
assert chunks_docs[4].meta["page_number"] == 3
assert chunks_docs[4].meta["split_id"] == 4
assert chunks_docs[4].meta["split_idx_start"] == text.index(chunks_docs[4].content)
assert chunks_docs[5].content == "Another on page 3.\f\f"
assert chunks_docs[5].meta["page_number"] == 3
assert chunks_docs[5].meta["split_id"] == 5
assert chunks_docs[5].meta["split_idx_start"] == text.index(chunks_docs[5].content)
assert chunks_docs[6].content == "Sentence on page 5."
assert chunks_docs[6].meta["page_number"] == 5
assert chunks_docs[6].meta["split_id"] == 6
assert chunks_docs[6].meta["split_idx_start"] == text.index(chunks_docs[6].content)
def test_run_split_document_with_overlap_character_unit():
splitter = RecursiveDocumentSplitter(split_length=20, split_overlap=10, separators=["."], split_unit="char")
text = """A simple sentence1. A bright sentence2. A clever sentence3"""
doc = Document(content=text)
doc_chunks = splitter.run([doc])
doc_chunks = doc_chunks["documents"]
assert len(doc_chunks) == 5
assert doc_chunks[0].content == "A simple sentence1."
assert doc_chunks[0].meta["split_id"] == 0
assert doc_chunks[0].meta["split_idx_start"] == text.index(doc_chunks[0].content)
assert doc_chunks[0].meta["_split_overlap"] == [{"doc_id": doc_chunks[1].id, "range": (0, 10)}]
assert doc_chunks[1].content == "sentence1. A bright "
assert doc_chunks[1].meta["split_id"] == 1
assert doc_chunks[1].meta["split_idx_start"] == text.index(doc_chunks[1].content)
assert doc_chunks[1].meta["_split_overlap"] == [
{"doc_id": doc_chunks[0].id, "range": (9, 19)},
{"doc_id": doc_chunks[2].id, "range": (0, 10)},
]
assert doc_chunks[2].content == " A bright sentence2."
assert doc_chunks[2].meta["split_id"] == 2
assert doc_chunks[2].meta["split_idx_start"] == text.index(doc_chunks[2].content)
assert doc_chunks[2].meta["_split_overlap"] == [
{"doc_id": doc_chunks[1].id, "range": (10, 20)},
{"doc_id": doc_chunks[3].id, "range": (0, 10)},
]
assert doc_chunks[3].content == "sentence2. A clever "
assert doc_chunks[3].meta["split_id"] == 3
assert doc_chunks[3].meta["split_idx_start"] == text.index(doc_chunks[3].content)
assert doc_chunks[3].meta["_split_overlap"] == [
{"doc_id": doc_chunks[2].id, "range": (10, 20)},
{"doc_id": doc_chunks[4].id, "range": (0, 10)},
]
assert doc_chunks[4].content == " A clever sentence3"
assert doc_chunks[4].meta["split_id"] == 4
assert doc_chunks[4].meta["split_idx_start"] == text.index(doc_chunks[4].content)
assert doc_chunks[4].meta["_split_overlap"] == [{"doc_id": doc_chunks[3].id, "range": (10, 20)}]
def test_run_separator_exists_but_split_length_too_small_fall_back_to_character_chunking():
splitter = RecursiveDocumentSplitter(separators=[" "], split_length=2, split_unit="char")
doc = Document(content="This is some text")
result = splitter.run(documents=[doc])
assert len(result["documents"]) == 10
for doc in result["documents"]:
if re.escape(doc.content) not in ["\ "]:
assert len(doc.content) == 2
def test_run_fallback_to_character_chunking_by_default_length_too_short():
text = "abczdefzghizjkl"
separators = ["\n\n", "\n", "z"]
splitter = RecursiveDocumentSplitter(split_length=2, separators=separators, split_unit="char")
doc = Document(content=text)
chunks = splitter.run([doc])["documents"]
for chunk in chunks:
assert len(chunk.content) <= 2
def test_run_fallback_to_word_chunking_by_default_length_too_short():
text = "This is some text. This is some more text, and even more text."
separators = ["\n\n", "\n", "."]
splitter = RecursiveDocumentSplitter(split_length=2, separators=separators, split_unit="word")
doc = Document(content=text)
chunks = splitter.run([doc])["documents"]
for chunk in chunks:
assert splitter._chunk_length(chunk.content) <= 2
def test_run_custom_sentence_tokenizer_document_and_overlap_char_unit():
"""Test that RecursiveDocumentSplitter works correctly with custom sentence tokenizer and overlap"""
splitter = RecursiveDocumentSplitter(split_length=25, split_overlap=10, separators=["sentence"], split_unit="char")
text = "This is sentence one. This is sentence two. This is sentence three."
splitter.warm_up()
doc = Document(content=text)
doc_chunks = splitter.run([doc])["documents"]
assert len(doc_chunks) == 4
assert doc_chunks[0].content == "This is sentence one. "
assert doc_chunks[0].meta["split_id"] == 0
assert doc_chunks[0].meta["split_idx_start"] == text.index(doc_chunks[0].content)
assert doc_chunks[0].meta["_split_overlap"] == [{"doc_id": doc_chunks[1].id, "range": (0, 10)}]
assert doc_chunks[1].content == "ence one. This is sentenc"
assert doc_chunks[1].meta["split_id"] == 1
assert doc_chunks[1].meta["split_idx_start"] == text.index(doc_chunks[1].content)
assert doc_chunks[1].meta["_split_overlap"] == [
{"doc_id": doc_chunks[0].id, "range": (12, 22)},
{"doc_id": doc_chunks[2].id, "range": (0, 10)},
]
assert doc_chunks[2].content == "is sentence two. This is "
assert doc_chunks[2].meta["split_id"] == 2
assert doc_chunks[2].meta["split_idx_start"] == text.index(doc_chunks[2].content)
assert doc_chunks[2].meta["_split_overlap"] == [
{"doc_id": doc_chunks[1].id, "range": (15, 25)},
{"doc_id": doc_chunks[3].id, "range": (0, 10)},
]
assert doc_chunks[3].content == ". This is sentence three."
assert doc_chunks[3].meta["split_id"] == 3
assert doc_chunks[3].meta["split_idx_start"] == text.index(doc_chunks[3].content)
assert doc_chunks[3].meta["_split_overlap"] == [{"doc_id": doc_chunks[2].id, "range": (15, 25)}]
def test_run_split_by_dot_count_page_breaks_word_unit() -> None:
document_splitter = RecursiveDocumentSplitter(separators=["."], split_length=4, split_overlap=0, split_unit="word")
text = (
"Sentence on page 1. Another on page 1.\fSentence on page 2. Another on page 2.\f"
"Sentence on page 3. Another on page 3.\f\f Sentence on page 5."
)
documents = document_splitter.run(documents=[Document(content=text)])["documents"]
assert len(documents) == 8
assert documents[0].content == "Sentence on page 1."
assert documents[0].meta["page_number"] == 1
assert documents[0].meta["split_id"] == 0
assert documents[0].meta["split_idx_start"] == text.index(documents[0].content)
assert documents[1].content == " Another on page 1."
assert documents[1].meta["page_number"] == 1
assert documents[1].meta["split_id"] == 1
assert documents[1].meta["split_idx_start"] == text.index(documents[1].content)
assert documents[2].content == "\fSentence on page 2."
assert documents[2].meta["page_number"] == 2
assert documents[2].meta["split_id"] == 2
assert documents[2].meta["split_idx_start"] == text.index(documents[2].content)
assert documents[3].content == " Another on page 2."
assert documents[3].meta["page_number"] == 2
assert documents[3].meta["split_id"] == 3
assert documents[3].meta["split_idx_start"] == text.index(documents[3].content)
assert documents[4].content == "\fSentence on page 3."
assert documents[4].meta["page_number"] == 3
assert documents[4].meta["split_id"] == 4
assert documents[4].meta["split_idx_start"] == text.index(documents[4].content)
assert documents[5].content == " Another on page 3."
assert documents[5].meta["page_number"] == 3
assert documents[5].meta["split_id"] == 5
assert documents[5].meta["split_idx_start"] == text.index(documents[5].content)
assert documents[6].content == "\f\f Sentence on page"
assert documents[6].meta["page_number"] == 5
assert documents[6].meta["split_id"] == 6
assert documents[6].meta["split_idx_start"] == text.index(documents[6].content)
assert documents[7].content == " 5."
assert documents[7].meta["page_number"] == 5
assert documents[7].meta["split_id"] == 7
assert documents[7].meta["split_idx_start"] == text.index(documents[7].content)
def test_run_split_by_word_count_page_breaks_word_unit():
splitter = RecursiveDocumentSplitter(split_length=4, split_overlap=0, separators=[" "], split_unit="word")
text = "This is some text. \f This text is on another page. \f This is the last pag3."
doc = Document(content=text)
doc_chunks = splitter.run([doc])
doc_chunks = doc_chunks["documents"]
assert len(doc_chunks) == 5
assert doc_chunks[0].content == "This is some text. "
assert doc_chunks[0].meta["page_number"] == 1
assert doc_chunks[0].meta["split_id"] == 0
assert doc_chunks[0].meta["split_idx_start"] == text.index(doc_chunks[0].content)
assert doc_chunks[1].content == "\f This text is "
assert doc_chunks[1].meta["page_number"] == 2
assert doc_chunks[1].meta["split_id"] == 1
assert doc_chunks[1].meta["split_idx_start"] == text.index(doc_chunks[1].content)
assert doc_chunks[2].content == "on another page. \f "
assert doc_chunks[2].meta["page_number"] == 3
assert doc_chunks[2].meta["split_id"] == 2
assert doc_chunks[2].meta["split_idx_start"] == text.index(doc_chunks[2].content)
assert doc_chunks[3].content == "This is the last "
assert doc_chunks[3].meta["page_number"] == 3
assert doc_chunks[3].meta["split_id"] == 3
assert doc_chunks[3].meta["split_idx_start"] == text.index(doc_chunks[3].content)
assert doc_chunks[4].content == "pag3."
assert doc_chunks[4].meta["page_number"] == 3
assert doc_chunks[4].meta["split_id"] == 4
assert doc_chunks[4].meta["split_idx_start"] == text.index(doc_chunks[4].content)
def test_run_split_by_page_break_count_page_breaks_word_unit() -> None:
document_splitter = RecursiveDocumentSplitter(separators=["\f"], split_length=8, split_overlap=0, split_unit="word")
text = (
"Sentence on page 1. Another on page 1.\fSentence on page 2. Another on page 2.\f"
"Sentence on page 3. Another on page 3.\f\f Sentence on page 5."
)
documents = document_splitter.run(documents=[Document(content=text)])
chunks_docs = documents["documents"]
assert len(chunks_docs) == 4
assert chunks_docs[0].content == "Sentence on page 1. Another on page 1.\f"
assert chunks_docs[0].meta["page_number"] == 1
assert chunks_docs[0].meta["split_id"] == 0
assert chunks_docs[0].meta["split_idx_start"] == text.index(chunks_docs[0].content)
assert chunks_docs[1].content == "Sentence on page 2. Another on page 2.\f"
assert chunks_docs[1].meta["page_number"] == 2
assert chunks_docs[1].meta["split_id"] == 1
assert chunks_docs[1].meta["split_idx_start"] == text.index(chunks_docs[1].content)
assert chunks_docs[2].content == "Sentence on page 3. Another on page 3.\f"
assert chunks_docs[2].meta["page_number"] == 3
assert chunks_docs[2].meta["split_id"] == 2
assert chunks_docs[2].meta["split_idx_start"] == text.index(chunks_docs[2].content)
assert chunks_docs[3].content == "\f Sentence on page 5."
assert chunks_docs[3].meta["page_number"] == 5
assert chunks_docs[3].meta["split_id"] == 3
assert chunks_docs[3].meta["split_idx_start"] == text.index(chunks_docs[3].content)
def test_run_split_by_new_line_count_page_breaks_word_unit() -> None:
document_splitter = RecursiveDocumentSplitter(separators=["\n"], split_length=4, split_overlap=0, split_unit="word")
text = (
"Sentence on page 1.\nAnother on page 1.\n\f"
"Sentence on page 2.\nAnother on page 2.\n\f"
"Sentence on page 3.\nAnother on page 3.\n\f\f"
"Sentence on page 5."
)
documents = document_splitter.run(documents=[Document(content=text)])
chunks_docs = documents["documents"]
assert len(chunks_docs) == 7
assert chunks_docs[0].content == "Sentence on page 1.\n"
assert chunks_docs[0].meta["page_number"] == 1
assert chunks_docs[0].meta["split_id"] == 0
assert chunks_docs[0].meta["split_idx_start"] == text.index(chunks_docs[0].content)
assert chunks_docs[1].content == "Another on page 1.\n"
assert chunks_docs[1].meta["page_number"] == 1
assert chunks_docs[1].meta["split_id"] == 1
assert chunks_docs[1].meta["split_idx_start"] == text.index(chunks_docs[1].content)
assert chunks_docs[2].content == "\fSentence on page 2.\n"
assert chunks_docs[2].meta["page_number"] == 2
assert chunks_docs[2].meta["split_id"] == 2
assert chunks_docs[2].meta["split_idx_start"] == text.index(chunks_docs[2].content)
assert chunks_docs[3].content == "Another on page 2.\n"
assert chunks_docs[3].meta["page_number"] == 2
assert chunks_docs[3].meta["split_id"] == 3
assert chunks_docs[3].meta["split_idx_start"] == text.index(chunks_docs[3].content)
assert chunks_docs[4].content == "\fSentence on page 3.\n"
assert chunks_docs[4].meta["page_number"] == 3
assert chunks_docs[4].meta["split_id"] == 4
assert chunks_docs[4].meta["split_idx_start"] == text.index(chunks_docs[4].content)
assert chunks_docs[5].content == "Another on page 3.\n"
assert chunks_docs[5].meta["page_number"] == 3
assert chunks_docs[5].meta["split_id"] == 5
assert chunks_docs[5].meta["split_idx_start"] == text.index(chunks_docs[5].content)
assert chunks_docs[6].content == "\f\fSentence on page 5."
assert chunks_docs[6].meta["page_number"] == 5
assert chunks_docs[6].meta["split_id"] == 6
assert chunks_docs[6].meta["split_idx_start"] == text.index(chunks_docs[6].content)
def test_run_split_by_sentence_count_page_breaks_word_unit() -> None:
document_splitter = RecursiveDocumentSplitter(
separators=["sentence"], split_length=7, split_overlap=0, split_unit="word"
)
document_splitter.warm_up()
text = (
"Sentence on page 1. Another on page 1.\fSentence on page 2. Another on page 2.\f"
"Sentence on page 3. Another on page 3.\f\fSentence on page 5."
)
documents = document_splitter.run(documents=[Document(content=text)])
chunks_docs = documents["documents"]
assert len(chunks_docs) == 7
assert chunks_docs[0].content == "Sentence on page 1. "
assert chunks_docs[0].meta["page_number"] == 1
assert chunks_docs[0].meta["split_id"] == 0
assert chunks_docs[0].meta["split_idx_start"] == text.index(chunks_docs[0].content)
assert chunks_docs[1].content == "Another on page 1.\f"
assert chunks_docs[1].meta["page_number"] == 1
assert chunks_docs[1].meta["split_id"] == 1
assert chunks_docs[1].meta["split_idx_start"] == text.index(chunks_docs[1].content)
assert chunks_docs[2].content == "Sentence on page 2. "
assert chunks_docs[2].meta["page_number"] == 2
assert chunks_docs[2].meta["split_id"] == 2
assert chunks_docs[2].meta["split_idx_start"] == text.index(chunks_docs[2].content)
assert chunks_docs[3].content == "Another on page 2.\f"
assert chunks_docs[3].meta["page_number"] == 2
assert chunks_docs[3].meta["split_id"] == 3
assert chunks_docs[3].meta["split_idx_start"] == text.index(chunks_docs[3].content)
assert chunks_docs[4].content == "Sentence on page 3. "
assert chunks_docs[4].meta["page_number"] == 3
assert chunks_docs[4].meta["split_id"] == 4
assert chunks_docs[4].meta["split_idx_start"] == text.index(chunks_docs[4].content)
assert chunks_docs[5].content == "Another on page 3.\f\f"
assert chunks_docs[5].meta["page_number"] == 3
assert chunks_docs[5].meta["split_id"] == 5
assert chunks_docs[5].meta["split_idx_start"] == text.index(chunks_docs[5].content)
assert chunks_docs[6].content == "Sentence on page 5."
assert chunks_docs[6].meta["page_number"] == 5
assert chunks_docs[6].meta["split_id"] == 6
assert chunks_docs[6].meta["split_idx_start"] == text.index(chunks_docs[6].content)
def test_run_split_by_sentence_tokenizer_document_and_overlap_word_unit_no_overlap():
splitter = RecursiveDocumentSplitter(split_length=4, split_overlap=0, separators=["."], split_unit="word")
text = "This is sentence one. This is sentence two. This is sentence three."
chunks = splitter.run([Document(content=text)])["documents"]
assert len(chunks) == 3
assert chunks[0].content == "This is sentence one."
assert chunks[1].content == " This is sentence two."
assert chunks[2].content == " This is sentence three."
def test_run_split_by_dot_and_overlap_1_word_unit():
splitter = RecursiveDocumentSplitter(split_length=4, split_overlap=1, separators=["."], split_unit="word")
text = "This is sentence one. This is sentence two. This is sentence three. This is sentence four."
chunks = splitter.run([Document(content=text)])["documents"]
assert len(chunks) == 5
assert chunks[0].content == "This is sentence one."
assert chunks[1].content == "one. This is sentence"
assert chunks[2].content == "sentence two. This is"
assert chunks[3].content == "is sentence three. This"
assert chunks[4].content == "This is sentence four."
def test_run_trigger_dealing_with_remaining_word_larger_than_split_length():
splitter = RecursiveDocumentSplitter(split_length=3, split_overlap=2, separators=["."], split_unit="word")
text = """A simple sentence1. A bright sentence2. A clever sentence3"""
doc = Document(content=text)
chunks = splitter.run([doc])["documents"]
assert len(chunks) == 7
assert chunks[0].content == "A simple sentence1."
assert chunks[1].content == "simple sentence1. A"
assert chunks[2].content == "sentence1. A bright"
assert chunks[3].content == "A bright sentence2."
assert chunks[4].content == "bright sentence2. A"
assert chunks[5].content == "sentence2. A clever"
assert chunks[6].content == "A clever sentence3"
def test_run_trigger_dealing_with_remaining_char_larger_than_split_length():
splitter = RecursiveDocumentSplitter(split_length=20, split_overlap=15, separators=["."], split_unit="char")
text = """A simple sentence1. A bright sentence2. A clever sentence3"""
doc = Document(content=text)
chunks = splitter.run([doc])["documents"]
assert len(chunks) == 9
assert chunks[0].content == "A simple sentence1."
assert chunks[0].meta["split_id"] == 0
assert chunks[0].meta["split_idx_start"] == text.index(chunks[0].content)
assert chunks[0].meta["_split_overlap"] == [{"doc_id": chunks[1].id, "range": (0, 15)}]
assert chunks[1].content == "mple sentence1. A br"
assert chunks[1].meta["split_id"] == 1
assert chunks[1].meta["split_idx_start"] == text.index(chunks[1].content)
assert chunks[1].meta["_split_overlap"] == [
{"doc_id": chunks[0].id, "range": (4, 19)},
{"doc_id": chunks[2].id, "range": (0, 15)},
]
assert chunks[2].content == "sentence1. A bright "
assert chunks[2].meta["split_id"] == 2
assert chunks[2].meta["split_idx_start"] == text.index(chunks[2].content)
assert chunks[2].meta["_split_overlap"] == [
{"doc_id": chunks[1].id, "range": (5, 20)},
{"doc_id": chunks[3].id, "range": (0, 15)},
]
assert chunks[3].content == "nce1. A bright sente"
assert chunks[3].meta["split_id"] == 3
assert chunks[3].meta["split_idx_start"] == text.index(chunks[3].content)
assert chunks[3].meta["_split_overlap"] == [
{"doc_id": chunks[2].id, "range": (5, 20)},
{"doc_id": chunks[4].id, "range": (0, 15)},
]
assert chunks[4].content == " A bright sentence2."
assert chunks[4].meta["split_id"] == 4
assert chunks[4].meta["split_idx_start"] == text.index(chunks[4].content)
assert chunks[4].meta["_split_overlap"] == [
{"doc_id": chunks[3].id, "range": (5, 20)},
{"doc_id": chunks[5].id, "range": (0, 15)},
]
assert chunks[5].content == "ight sentence2. A cl"
assert chunks[5].meta["split_id"] == 5
assert chunks[5].meta["split_idx_start"] == text.index(chunks[5].content)
assert chunks[5].meta["_split_overlap"] == [
{"doc_id": chunks[4].id, "range": (5, 20)},
{"doc_id": chunks[6].id, "range": (0, 15)},
]
assert chunks[6].content == "sentence2. A clever "
assert chunks[6].meta["split_id"] == 6
assert chunks[6].meta["split_idx_start"] == text.index(chunks[6].content)
assert chunks[6].meta["_split_overlap"] == [
{"doc_id": chunks[5].id, "range": (5, 20)},
{"doc_id": chunks[7].id, "range": (0, 15)},
]
assert chunks[7].content == "nce2. A clever sente"
assert chunks[7].meta["split_id"] == 7
assert chunks[7].meta["split_idx_start"] == text.index(chunks[7].content)
assert chunks[7].meta["_split_overlap"] == [
{"doc_id": chunks[6].id, "range": (5, 20)},
{"doc_id": chunks[8].id, "range": (0, 15)},
]
assert chunks[8].content == " A clever sentence3"
assert chunks[8].meta["split_id"] == 8
assert chunks[8].meta["split_idx_start"] == text.index(chunks[8].content)
assert chunks[8].meta["_split_overlap"] == [{"doc_id": chunks[7].id, "range": (5, 20)}]
def test_run_custom_split_by_dot_and_overlap_3_char_unit():
document_splitter = RecursiveDocumentSplitter(separators=["."], split_length=4, split_overlap=0, split_unit="word")
text = "\x0c\x0c Sentence on page 5."
chunks = document_splitter._fall_back_to_fixed_chunking(text, split_units="word")
assert len(chunks) == 2
assert chunks[0] == "\x0c\x0c Sentence on page"
assert chunks[1] == " 5."
def test_run_serialization_in_pipeline():
pipeline = Pipeline()
pipeline.add_component("chunker", RecursiveDocumentSplitter(split_length=20, split_overlap=5, separators=["."]))
pipeline_dict = pipeline.dumps()
new_pipeline = Pipeline.loads(pipeline_dict)
assert pipeline_dict == new_pipeline.dumps()