"""Chunking objects not specific to a particular chunking strategy."""
from __future__ import annotations
import collections
import copy
from typing import Any, Callable, DefaultDict, Iterable, Iterator, cast
import regex
from typing_extensions import Self, TypeAlias
from unstructured.common.html_table import HtmlCell, HtmlRow, HtmlTable
from unstructured.documents.elements import (
CompositeElement,
ConsolidationStrategy,
Element,
ElementMetadata,
Table,
TableChunk,
Title,
)
from unstructured.utils import lazyproperty
# ================================================================================================
# MODEL
# ================================================================================================
CHUNK_MAX_CHARS_DEFAULT: int = 500
"""Hard-max chunk-length when no explicit value specified in `max_characters` argument.
Provided for reference only, for example so the ingest CLI can advertise the default value in its
UI. External chunking-related functions (e.g. in ingest or decorators) should use
`max_characters: int | None = None` and not apply this default themselves. Only
`ChunkingOptions.max_characters` should apply a default value.
"""
CHUNK_MULTI_PAGE_DEFAULT: bool = True
"""When False, respect page-boundaries (no two elements from different page in same chunk).
Only operative for "by_title" chunking strategy.
"""
BoundaryPredicate: TypeAlias = Callable[[Element], bool]
"""Detects when element represents crossing a semantic boundary like section or page."""
PreChunk: TypeAlias = "TablePreChunk | TextPreChunk"
"""The kind of object produced by a pre-chunker."""
TextAndHtml: TypeAlias = tuple[str, str]
# ================================================================================================
# CHUNKING OPTIONS
# ================================================================================================
class ChunkingOptions:
"""Specifies parameters of optional chunking behaviors.
Parameters
----------
max_characters
Hard-maximum text-length of chunk. A chunk longer than this will be split mid-text and be
emitted as two or more chunks.
new_after_n_chars
Preferred approximate chunk size. A chunk composed of elements totalling this size or
greater is considered "full" and will not be enlarged by adding another element, even if it
will fit within the remaining `max_characters` for that chunk. Defaults to `max_characters`
when not specified, which effectively disables this behavior. Specifying 0 for this
argument causes each element to appear in a chunk by itself (although an element with text
longer than `max_characters` will be still be split into two or more chunks).
combine_text_under_n_chars
Provides a way to "recombine" small chunks formed by breaking on a semantic boundary. Only
relevant for a chunking strategy that specifies higher-level semantic boundaries to be
respected, like "section" or "page". Recursively combines two adjacent pre-chunks when the
first pre-chunk is smaller than this threshold. "Recursively" here means the resulting
pre-chunk can be combined with the next pre-chunk if it is still under the length threshold.
Defaults to `max_characters` which combines chunks whenever space allows. Specifying 0 for
this argument suppresses combining of small chunks. Note this value is "capped" at the
`new_after_n_chars` value since a value higher than that would not change this parameter's
effect.
overlap
Specifies the length of a string ("tail") to be drawn from each chunk and prefixed to the
next chunk as a context-preserving mechanism. By default, this only applies to split-chunks
where an oversized element is divided into multiple chunks by text-splitting.
overlap_all
Default: `False`. When `True`, apply overlap between "normal" chunks formed from whole
elements and not subject to text-splitting. Use this with caution as it entails a certain
level of "pollution" of otherwise clean semantic chunk boundaries.
text_splitting_separators
A sequence of strings like `("\n", " ")` to be used as target separators during
text-splitting. Text-splitting only applies to splitting an oversized element into two or
more chunks. These separators are tried in the specified order until one is found in the
string to be split. The default separator is `""` which matches between any two characters.
This separator should not be specified in this sequence because it is always the separator
of last-resort. Note that because the separator is removed during text-splitting, only
whitespace character sequences are suitable.
"""
def __init__(self, **kwargs: Any):
self._kwargs = kwargs
@classmethod
def new(cls, **kwargs: Any) -> Self:
"""Return instance or raises `ValueError` on invalid arguments like overlap > max_chars."""
self = cls(**kwargs)
self._validate()
return self
@lazyproperty
def boundary_predicates(self) -> tuple[BoundaryPredicate, ...]:
"""The semantic-boundary detectors to be applied to break pre-chunks.
Overridden by sub-typs to provide semantic-boundary isolation behaviors.
"""
return ()
@lazyproperty
def combine_text_under_n_chars(self) -> int:
"""Combine two consecutive text pre-chunks if first is smaller than this and both will fit.
Default applied here is `0` which essentially disables chunk combining. Must be overridden
by subclass where combining behavior is supported.
"""
arg_value = self._kwargs.get("combine_text_under_n_chars")
return arg_value if arg_value is not None else 0
@lazyproperty
def hard_max(self) -> int:
"""The maximum size for a chunk.
A pre-chunk will only exceed this size when it contains exactly one element which by itself
exceeds this size. Such a pre-chunk is subject to mid-text splitting later in the chunking
process.
"""
arg_value = self._kwargs.get("max_characters")
return arg_value if arg_value is not None else CHUNK_MAX_CHARS_DEFAULT
@lazyproperty
def include_orig_elements(self) -> bool:
"""When True, add original elements from pre-chunk to `.metadata.orig_elements` of chunk.
Default value is `True`.
"""
arg_value = self._kwargs.get("include_orig_elements")
return True if arg_value is None else bool(arg_value)
@lazyproperty
def inter_chunk_overlap(self) -> int:
"""Characters of overlap to add between chunks.
This applies only to boundaries between chunks formed from whole elements and not to
text-splitting boundaries that arise from splitting an oversized element.
"""
overlap_all_arg = self._kwargs.get("overlap_all")
return self.overlap if overlap_all_arg else 0
@lazyproperty
def overlap(self) -> int:
"""The number of characters to overlap text when splitting chunks mid-text.
The actual overlap will not exceed this number of characters but may be less as required to
respect splitting-character boundaries.
"""
overlap_arg = self._kwargs.get("overlap")
return overlap_arg or 0
@lazyproperty
def soft_max(self) -> int:
"""A pre-chunk of this size or greater is considered full.
Note that while a value of `0` is valid, it essentially disables chunking by putting
each element into its own chunk.
"""
hard_max = self.hard_max
new_after_n_chars_arg = self._kwargs.get("new_after_n_chars")
# -- default value is == max_characters --
if new_after_n_chars_arg is None:
return hard_max
# -- new_after_n_chars > max_characters behaves the same as ==max_characters --
if new_after_n_chars_arg > hard_max:
return hard_max
# -- otherwise, give them what they asked for --
return new_after_n_chars_arg
@lazyproperty
def split(self) -> Callable[[str], tuple[str, str]]:
"""A text-splitting function suitable for splitting the text of an oversized pre-chunk.
The function is pre-configured with the chosen chunking window size and any other applicable
options specified by the caller as part of this chunking-options instance.
"""
return _TextSplitter(self)
@lazyproperty
def text_separator(self) -> str:
"""The string to insert between elements when concatenating their text for a chunk.
Right now this is just "\n\n" (a blank line in plain text), but having this here rather
than as a module-level constant provides a way for us to easily make it user-configurable
in future if we want to.
"""
return "\n\n"
@lazyproperty
def text_splitting_separators(self) -> tuple[str, ...]:
"""Sequence of text-splitting target strings to be used in order of preference."""
text_splitting_separators_arg = self._kwargs.get("text_splitting_separators")
return (
("\n", " ")
if text_splitting_separators_arg is None
else tuple(text_splitting_separators_arg)
)
def _validate(self) -> None:
"""Raise ValueError if requestion option-set is invalid."""
max_characters = self.hard_max
# -- chunking window must have positive length --
if max_characters <= 0:
raise ValueError(f"'max_characters' argument must be > 0," f" got {max_characters}")
# -- a negative value for `new_after_n_chars` is assumed to be a mistake the caller will
# -- want to know about
new_after_n_chars = self._kwargs.get("new_after_n_chars")
if new_after_n_chars is not None and new_after_n_chars < 0:
raise ValueError(
f"'new_after_n_chars' argument must be >= 0," f" got {new_after_n_chars}"
)
# -- overlap must be less than max-chars or the chunk text will never be consumed --
if self.overlap >= max_characters:
raise ValueError(
f"'overlap' argument must be less than `max_characters`,"
f" got {self.overlap} >= {max_characters}"
)
# ================================================================================================
# PRE-CHUNKER
# ================================================================================================
class PreChunker:
"""Gathers sequential elements into pre-chunks as length constraints allow.
The pre-chunker's responsibilities are:
- **Segregate semantic units.** Identify semantic unit boundaries and segregate elements on
either side of those boundaries into different sections. In this case, the primary indicator
of a semantic boundary is a `Title` element. A page-break (change in page-number) is also a
semantic boundary when `multipage_sections` is `False`.
- **Minimize chunk count for each semantic unit.** Group the elements within a semantic unit
into sections as big as possible without exceeding the chunk window size.
- **Minimize chunks that must be split mid-text.** Precompute the text length of each section
and only produce a section that exceeds the chunk window size when there is a single element
with text longer than that window.
A Table element is placed into a section by itself. CheckBox elements are dropped.
The "by-title" strategy specifies breaking on section boundaries; a `Title` element indicates
a new "section", hence the "by-title" designation.
"""
def __init__(self, elements: Iterable[Element], opts: ChunkingOptions):
self._elements = elements
self._opts = opts
@classmethod
def iter_pre_chunks(
cls, elements: Iterable[Element], opts: ChunkingOptions
) -> Iterator[PreChunk]:
"""Generate pre-chunks from the element-stream provided on construction."""
return cls(elements, opts)._iter_pre_chunks()
def _iter_pre_chunks(self) -> Iterator[PreChunk]:
"""Generate pre-chunks from the element-stream provided on construction.
A *pre-chunk* is the largest sub-sequence of elements that will both fit within the
chunking window and respects the semantic boundary rules of the chunking strategy. When a
single element exceeds the chunking window size it is placed in a pre-chunk by itself and
is subject to mid-text splitting in the second phase of the chunking process.
"""
pre_chunk_builder = PreChunkBuilder(self._opts)
for element in self._elements:
# -- start new pre-chunk when necessary --
if self._is_in_new_semantic_unit(element) or not pre_chunk_builder.will_fit(element):
yield from pre_chunk_builder.flush()
# -- add this element to the work-in-progress (WIP) pre-chunk --
pre_chunk_builder.add_element(element)
# -- flush "tail" pre-chunk, any partially-filled pre-chunk after last element is
# -- processed
yield from pre_chunk_builder.flush()
@lazyproperty
def _boundary_predicates(self) -> tuple[BoundaryPredicate, ...]:
"""The semantic-boundary detectors to be applied to break pre-chunks."""
return self._opts.boundary_predicates
def _is_in_new_semantic_unit(self, element: Element) -> bool:
"""True when `element` begins a new semantic unit such as a section or page."""
# -- all detectors need to be called to update state and avoid double counting
# -- boundaries that happen to coincide, like Table and new section on same element.
# -- Using `any()` would short-circuit on first True.
semantic_boundaries = [pred(element) for pred in self._boundary_predicates]
return any(semantic_boundaries)
class PreChunkBuilder:
"""An element accumulator suitable for incrementally forming a pre-chunk.
Provides the trial method `.will_fit()` a pre-chunker can use to determine whether it should add
the next element in the element stream.
`.flush()` is used to build a PreChunk object from the accumulated elements. This method
returns an iterator that generates zero-or-one `TextPreChunk` or `TablePreChunk` object and is
used like so:
yield from builder.flush()
If no elements have been accumulated, no `PreChunk` instance is generated. Flushing the builder
clears the elements it contains so it is ready to build the next pre-chunk.
"""
def __init__(self, opts: ChunkingOptions) -> None:
self._opts = opts
self._separator_len = len(opts.text_separator)
self._elements: list[Element] = []
# -- overlap is only between pre-chunks so starts empty --
self._overlap_prefix: str = ""
# -- only includes non-empty element text, e.g. PageBreak.text=="" is not included --
self._text_segments: list[str] = []
# -- combined length of text-segments, not including separators --
self._text_len: int = 0
def add_element(self, element: Element) -> None:
"""Add `element` to this section."""
self._elements.append(element)
if element.text:
self._text_segments.append(element.text)
self._text_len += len(element.text)
def flush(self) -> Iterator[PreChunk]:
"""Generate zero-or-one `PreChunk` object and clear the accumulator.
Suitable for use to emit a PreChunk when the maximum size has been reached or a semantic
boundary has been reached. Also to clear out a terminal pre-chunk at the end of an element
stream.
"""
if not self._elements:
return
pre_chunk = (
TablePreChunk(self._elements[0], self._overlap_prefix, self._opts)
if isinstance(self._elements[0], Table)
# -- copy list, don't use original or it may change contents as builder proceeds --
else TextPreChunk(list(self._elements), self._overlap_prefix, self._opts)
)
# -- clear builder before yield so we're not sensitive to the timing of how/when this
# -- iterator is exhausted and can add elements for the next pre-chunk immediately.
self._reset_state(pre_chunk.overlap_tail)
yield pre_chunk
def will_fit(self, element: Element) -> bool:
"""True when `element` can be added to this prechunk without violating its limits.
There are several limits:
- A `Table` element will never fit with any other element. It will only fit in an empty
pre-chunk.
- No element will fit in a pre-chunk that already contains a `Table` element.
- A text-element will not fit in a pre-chunk that already exceeds the soft-max
(aka. new_after_n_chars).
- A text-element will not fit when together with the elements already present it would
exceed the hard-max (aka. max_characters).
"""
# -- an empty pre-chunk will accept any element (including an oversized-element) --
if len(self._elements) == 0:
return True
# -- a `Table` will not fit in a non-empty pre-chunk --
if isinstance(element, Table):
return False
# -- no element will fit in a pre-chunk that already contains a `Table` element --
if isinstance(self._elements[0], Table):
return False
# -- a pre-chunk that already exceeds the soft-max is considered "full" --
if self._text_length > self._opts.soft_max:
return False
# -- don't add an element if it would increase total size beyond the hard-max --
return not self._remaining_space < len(element.text)
@property
def _remaining_space(self) -> int:
"""Maximum text-length of an element that can be added without exceeding maxlen."""
# -- include length of trailing separator that will go before next element text --
separators_len = self._separator_len * len(self._text_segments)
return self._opts.hard_max - self._text_len - separators_len
def _reset_state(self, overlap_prefix: str) -> None:
"""Set working-state values back to "empty", ready to accumulate next pre-chunk."""
self._overlap_prefix = overlap_prefix
self._elements.clear()
self._text_segments = [overlap_prefix] if overlap_prefix else []
self._text_len = len(overlap_prefix)
@property
def _text_length(self) -> int:
"""Length of the text in this pre-chunk.
This value represents the chunk-size that would result if this pre-chunk was flushed in its
current state. In particular, it does not include the length of a trailing separator (since
that would only appear if an additional element was added).
Not suitable for judging remaining space, use `.remaining_space` for that value.
"""
# -- number of text separators present in joined text of elements. This includes only
# -- separators *between* text segments, not one at the end. Note there are zero separators
# -- for both 0 and 1 text-segments.
n = len(self._text_segments)
separator_count = n - 1 if n else 0
return self._text_len + (separator_count * self._separator_len)
# ================================================================================================
# PRE-CHUNK SUB-TYPES
# ================================================================================================
class TablePreChunk:
"""A pre-chunk composed of a single Table element."""
def __init__(self, table: Table, overlap_prefix: str, opts: ChunkingOptions) -> None:
self._table = table
self._overlap_prefix = overlap_prefix
self._opts = opts
def iter_chunks(self) -> Iterator[Table | TableChunk]:
"""Split this pre-chunk into `Table` or `TableChunk` objects maxlen or smaller."""
# -- A table with no non-whitespace text produces no chunks --
if not self._table_text:
return
# -- only text-split a table when it's longer than the chunking window --
maxlen = self._opts.hard_max
if len(self._text_with_overlap) <= maxlen and len(self._html) <= maxlen:
# -- use the compactified html for .text_as_html, even though we're not splitting --
metadata = self._metadata
metadata.text_as_html = self._html or None
# -- note the overlap-prefix is prepended to its text --
yield Table(text=self._text_with_overlap, metadata=metadata)
return
# -- When there's no HTML, split it like a normal element. Also fall back to text-only
# -- chunks when `max_characters` is less than 50. `.text_as_html` metadata is impractical
# -- for a chunking window that small because the 33 characterss of HTML overhead for each
# -- chunk (`
`) would produce a very large number of
# -- very small chunks.
if not self._html or self._opts.hard_max < 50:
yield from self._iter_text_only_table_chunks()
return
# -- otherwise, form splits with "synchronized" text and html --
yield from self._iter_text_and_html_table_chunks()
@lazyproperty
def overlap_tail(self) -> str:
"""The portion of this chunk's text to be repeated as a prefix in the next chunk.
This value is the empty-string ("") when either the `.overlap` length option is `0` or
`.overlap_all` is `False`. When there is a text value, it is stripped of both leading and
trailing whitespace.
"""
overlap = self._opts.inter_chunk_overlap
return self._text_with_overlap[-overlap:].strip() if overlap else ""
@lazyproperty
def _html(self) -> str:
"""The compactified HTML for this table when it has text-as-HTML.
The empty string when table-structure has not been captured, perhaps because
`infer_table_structure` was set `False` in the partitioning call.
"""
if not (html_table := self._html_table):
return ""
return html_table.html
@lazyproperty
def _html_table(self) -> HtmlTable | None:
"""The `lxml` HTML element object for this table.
`None` when the `Table` element has no `.metadata.text_as_html`.
"""
if (text_as_html := self._table.metadata.text_as_html) is None:
return None
text_as_html = text_as_html.strip()
if not text_as_html: # pragma: no cover
return None
return HtmlTable.from_html_text(text_as_html)
def _iter_text_and_html_table_chunks(self) -> Iterator[TableChunk]:
"""Split table into chunks where HTML corresponds exactly to text.
`.metadata.text_as_html` for each chunk is a parsable `` HTML fragment.
"""
if (html_table := self._html_table) is None: # pragma: no cover
raise ValueError("this method is undefined for a table having no .text_as_html")
is_continuation = False
for text, html in _TableSplitter.iter_subtables(html_table, self._opts):
metadata = self._metadata
metadata.text_as_html = html
# -- second and later chunks get `.metadata.is_continuation = True` --
metadata.is_continuation = is_continuation or None
is_continuation = True
yield TableChunk(text=text, metadata=metadata)
def _iter_text_only_table_chunks(self) -> Iterator[TableChunk]:
"""Split oversized text-only table (no text-as-html) into chunks."""
text_remainder = self._text_with_overlap
split = self._opts.split
is_continuation = False
while text_remainder:
# -- split off the next chunk-worth of characters into a TableChunk --
chunk_text, text_remainder = split(text_remainder)
metadata = self._metadata
# -- second and later chunks get `.metadata.is_continuation = True` --
metadata.is_continuation = is_continuation or None
is_continuation = True
yield TableChunk(text=chunk_text, metadata=metadata)
@property
def _metadata(self) -> ElementMetadata:
"""The base `.metadata` value for chunks formed from this pre-chunk.
The term "base" here means that other metadata fields will be added, depending on the
chunk. In particular, `.metadata.text_as_html` will be different for each text-split chunk
and `.metadata.is_continuation` must be added for second-and-later text-split chunks.
Note this is a fresh copy of the metadata on each call since it will need to be mutated
differently for each chunk formed from this pre-chunk.
"""
CS = ConsolidationStrategy
metadata = copy.deepcopy(self._table.metadata)
# -- drop metadata fields not appropriate for chunks, in particular
# -- parent_id's will not reliably point to an existing element
drop_field_names = [
field_name
for field_name, strategy in CS.field_consolidation_strategies().items()
if strategy is CS.DROP
]
for field_name in drop_field_names:
setattr(metadata, field_name, None)
if self._opts.include_orig_elements:
metadata.orig_elements = self._orig_elements
return metadata
@lazyproperty
def _orig_elements(self) -> list[Element]:
"""The `.metadata.orig_elements` value for chunks formed from this pre-chunk.
Note this is not just the `Table` element, it must be adjusted to strip out any
`.metadata.orig_elements` value it may have when it is itself a chunk and not a direct
product of partitioning.
"""
# -- make a copy because we're going to mutate the `Table` element and it doesn't belong to
# -- us (the user may have downstream purposes for it).
orig_table = copy.deepcopy(self._table)
# -- prevent recursive .orig_elements when `Table` element is a chunk --
orig_table.metadata.orig_elements = None
return [orig_table]
@lazyproperty
def _table_text(self) -> str:
"""The text in this table, not including any overlap-prefix or extra whitespace."""
return " ".join(self._table.text.split())
@lazyproperty
def _text_with_overlap(self) -> str:
"""The text for this chunk, including the overlap-prefix when present."""
overlap_prefix = self._overlap_prefix
table_text = self._table.text.strip()
# -- use row-separator between overlap and table-text --
return overlap_prefix + "\n" + table_text if overlap_prefix else table_text
class TextPreChunk:
"""A sequence of elements that belong to the same semantic unit within a document.
The name "section" derives from the idea of a document-section, a heading followed by the
paragraphs "under" that heading. That structure is not found in all documents and actual section
content can vary, but that's the concept.
This object is purposely immutable.
"""
def __init__(
self, elements: Iterable[Element], overlap_prefix: str, opts: ChunkingOptions
) -> None:
self._elements = list(elements)
self._overlap_prefix = overlap_prefix
self._opts = opts
def __eq__(self, other: Any) -> bool:
if not isinstance(other, TextPreChunk):
return False
return self._overlap_prefix == other._overlap_prefix and self._elements == other._elements
def can_combine(self, pre_chunk: TextPreChunk) -> bool:
"""True when `pre_chunk` can be combined with this one without exceeding size limits."""
if len(self._text) >= self._opts.combine_text_under_n_chars:
return False
# -- avoid duplicating length computations by doing a trial-combine which is just as
# -- efficient and definitely more robust than hoping two different computations of combined
# -- length continue to get the same answer as the code evolves. Only possible because
# -- `.combine()` is non-mutating.
combined_len = len(self.combine(pre_chunk)._text)
return combined_len <= self._opts.hard_max
def combine(self, other_pre_chunk: TextPreChunk) -> TextPreChunk:
"""Return new `TextPreChunk` that combines this and `other_pre_chunk`."""
# -- combined pre-chunk gets the overlap-prefix of the first pre-chunk. The second overlap
# -- is automatically incorporated at the end of the first chunk, where it originated.
return TextPreChunk(
self._elements + other_pre_chunk._elements,
overlap_prefix=self._overlap_prefix,
opts=self._opts,
)
def iter_chunks(self) -> Iterator[CompositeElement]:
"""Split this pre-chunk into one or more `CompositeElement` objects maxlen or smaller."""
# -- a pre-chunk containing no text (maybe only a PageBreak element for example) does not
# -- generate any chunks.
if not self._text:
return
split = self._opts.split
# -- emit first chunk --
s, remainder = split(self._text)
yield CompositeElement(text=s, metadata=self._consolidated_metadata)
# -- an oversized pre-chunk will have a remainder, split that up into additional chunks.
# -- Note these get continuation_metadata which includes is_continuation=True.
while remainder:
s, remainder = split(remainder)
yield CompositeElement(text=s, metadata=self._continuation_metadata)
@lazyproperty
def overlap_tail(self) -> str:
"""The portion of this chunk's text to be repeated as a prefix in the next chunk.
This value is the empty-string ("") when either the `.overlap` length option is `0` or
`.overlap_all` is `False`. When there is a text value, it is stripped of both leading and
trailing whitespace.
"""
overlap = self._opts.inter_chunk_overlap
return self._text[-overlap:].strip() if overlap else ""
@lazyproperty
def _all_metadata_values(self) -> dict[str, list[Any]]:
"""Collection of all populated metadata values across elements.
The resulting dict has one key for each `ElementMetadata` field that had a non-None value in
at least one of the elements in this pre-chunk. The value of that key is a list of all those
populated values, in element order, for example:
{
"filename": ["sample.docx", "sample.docx"],
"languages": [["lat"], ["lat", "eng"]]
...
}
This preprocessing step provides the input for a specified consolidation strategy that will
resolve the list of values for each field to a single consolidated value.
"""
def iter_populated_fields(metadata: ElementMetadata) -> Iterator[tuple[str, Any]]:
"""(field_name, value) pair for each non-None field in single `ElementMetadata`."""
return (
(field_name, value)
for field_name, value in metadata.known_fields.items()
if value is not None
)
field_values: DefaultDict[str, list[Any]] = collections.defaultdict(list)
# -- collect all non-None field values in a list for each field, in element-order --
for e in self._elements:
for field_name, value in iter_populated_fields(e.metadata):
field_values[field_name].append(value)
return dict(field_values)
@lazyproperty
def _consolidated_metadata(self) -> ElementMetadata:
"""Metadata applicable to this pre-chunk as a single chunk.
Formed by applying consolidation rules to all metadata fields across the elements of this
pre-chunk.
For the sake of consistency, the same rules are applied (for example, for dropping values)
to a single-element pre-chunk too, even though metadata for such a pre-chunk is already
"consolidated".
"""
consolidated_metadata = ElementMetadata(**self._meta_kwargs)
if self._opts.include_orig_elements:
consolidated_metadata.orig_elements = self._orig_elements
return consolidated_metadata
@lazyproperty
def _continuation_metadata(self) -> ElementMetadata:
"""Metadata applicable to the second and later text-split chunks of the pre-chunk.
The same metadata as the first text-split chunk but includes `.is_continuation = True`.
Unused for non-oversized pre-chunks since those are not subject to text-splitting.
"""
# -- we need to make a copy, otherwise adding a field would also change metadata value
# -- already assigned to another chunk (e.g. the first text-split chunk). Deep-copy is not
# -- required though since we're not changing any collection fields.
continuation_metadata = copy.copy(self._consolidated_metadata)
continuation_metadata.is_continuation = True
return continuation_metadata
def _iter_text_segments(self) -> Iterator[str]:
"""Generate overlap text and each element text segment in order.
Empty text segments are not included.
"""
if self._overlap_prefix:
yield self._overlap_prefix
for e in self._elements:
if not e.text:
continue
yield e.text
@lazyproperty
def _meta_kwargs(self) -> dict[str, Any]:
"""The consolidated metadata values as a dict suitable for constructing ElementMetadata.
This is where consolidation strategies are actually applied. The output is suitable for use
in constructing an `ElementMetadata` object like `ElementMetadata(**self._meta_kwargs)`.
"""
CS = ConsolidationStrategy
field_consolidation_strategies = ConsolidationStrategy.field_consolidation_strategies()
def iter_kwarg_pairs() -> Iterator[tuple[str, Any]]:
"""Generate (field-name, value) pairs for each field in consolidated metadata."""
for field_name, values in self._all_metadata_values.items():
strategy = field_consolidation_strategies.get(field_name)
if strategy is CS.FIRST:
yield field_name, values[0]
# -- concatenate lists from each element that had one, in order --
elif strategy is CS.LIST_CONCATENATE:
yield field_name, sum(values, cast("list[Any]", []))
# -- union lists from each element, preserving order of appearance --
elif strategy is CS.LIST_UNIQUE:
# -- Python 3.7+ maintains dict insertion order --
ordered_unique_keys = {key: None for val_list in values for key in val_list}
yield field_name, list(ordered_unique_keys.keys())
elif strategy is CS.STRING_CONCATENATE:
yield field_name, " ".join(val.strip() for val in values)
elif strategy is CS.DROP:
continue
else: # pragma: no cover
# -- not likely to hit this since we have a test in `text_elements.py` that
# -- ensures every ElementMetadata fields has an assigned strategy.
raise NotImplementedError(
f"metadata field {repr(field_name)} has no defined consolidation strategy"
)
return dict(iter_kwarg_pairs())
@lazyproperty
def _orig_elements(self) -> list[Element]:
"""The `.metadata.orig_elements` value for chunks formed from this pre-chunk."""
def iter_orig_elements():
for e in self._elements:
if e.metadata.orig_elements is None:
yield e
continue
# -- make copy of any element we're going to mutate because these elements don't
# -- belong to us (the user may have downstream purposes for them).
orig_element = copy.copy(e)
# -- prevent recursive .orig_elements when element is a chunk (has orig-elements of
# -- its own)
orig_element.metadata.orig_elements = None
yield orig_element
return list(iter_orig_elements())
@lazyproperty
def _text(self) -> str:
"""The concatenated text of all elements in this pre-chunk.
Each element-text is separated from the next by a blank line ("\n\n").
"""
text_separator = self._opts.text_separator
return text_separator.join(self._iter_text_segments())
# ================================================================================================
# PRE-CHUNK SPLITTERS
# ================================================================================================
class _TableSplitter:
"""Produces (text, html) pairs for a `` HtmlElement.
Each chunk contains a whole number of rows whenever possible. An oversized row is split on an
even cell boundary and a single cell that is by itself too big to fit in the chunking window
is divided by text-splitting.
The returned `html` value is always a parseable HTML `` subtree.
"""
def __init__(self, table_element: HtmlTable, opts: ChunkingOptions):
self._table_element = table_element
self._opts = opts
@classmethod
def iter_subtables(
cls, table_element: HtmlTable, opts: ChunkingOptions
) -> Iterator[TextAndHtml]:
"""Generate (text, html) pair for each split of this table pre-chunk.
Each split is on an even row boundary whenever possible, falling back to even cell and even
word boundaries when a row or cell is by itself oversized, respectively.
"""
return cls(table_element, opts)._iter_subtables()
def _iter_subtables(self) -> Iterator[TextAndHtml]:
"""Generate (text, html) pairs containing as many whole rows as will fit in window.
Falls back to splitting rows into whole cells when a single row is by itself too big to
fit in the chunking window.
"""
accum = _RowAccumulator(maxlen=self._opts.hard_max)
for row in self._table_element.iter_rows():
# -- if row won't fit, any WIP chunk is done, send it on its way --
if not accum.will_fit(row):
yield from accum.flush()
# -- if row fits, add it to accumulator --
if accum.will_fit(row):
accum.add_row(row)
else: # -- otherwise, single row is bigger than chunking window --
yield from self._iter_row_splits(row)
yield from accum.flush()
def _iter_row_splits(self, row: HtmlRow) -> Iterator[TextAndHtml]:
"""Split oversized row into (text, html) pairs containing as many cells as will fit."""
accum = _CellAccumulator(maxlen=self._opts.hard_max)
for cell in row.iter_cells():
# -- if cell won't fit, flush and check again --
if not accum.will_fit(cell):
yield from accum.flush()
# -- if cell fits, add it to accumulator --
if accum.will_fit(cell):
accum.add_cell(cell)
else: # -- otherwise, single cell is bigger than chunking window --
yield from self._iter_cell_splits(cell)
yield from accum.flush()
def _iter_cell_splits(self, cell: HtmlCell) -> Iterator[TextAndHtml]:
"""Split a single oversized cell into sub-sub-sub-table HTML fragments."""
# -- 33 is len(""), HTML overhead beyond text content --
opts = ChunkingOptions(max_characters=(self._opts.hard_max - 33))
split = _TextSplitter(opts)
text, remainder = split(cell.text)
yield text, f""
# -- an oversized cell will have a remainder, split that up into additional chunks.
while remainder:
text, remainder = split(remainder)
yield text, f""
class _TextSplitter:
"""Provides a text-splitting function configured on construction.
Text is split on the best-available separator, falling-back from the preferred separator
through a sequence of alternate separators.
- The separator is removed by splitting so only whitespace strings are suitable separators.
- A "blank-line" ("\n\n") is unlikely to occur in an element as it would have been used as an
element boundary during partitioning.
This is a *callable* object. Constructing it essentially produces a function:
split = _TextSplitter(opts)
fragment, remainder = split(s)
This allows it to be configured with length-options etc. on construction and used throughout a
chunking operation on a given element-stream.
"""
def __init__(self, opts: ChunkingOptions):
self._opts = opts
def __call__(self, s: str) -> tuple[str, str]:
"""Return pair of strings split from `s` on the best match of configured patterns.
The first string is the split, the second is the remainder of the string. The split string
will never be longer than `maxlen`. The separators are tried in order until a match is
found. The last separator is "" which matches between any two characters so there will
always be a split.
The separator is removed and does not appear in the split or remainder.
An `s` that is already less than the maximum length is returned unchanged with no remainder.
This allows this function to be called repeatedly with the remainder until it is consumed
and returns a remainder of "".
"""
maxlen = self._opts.hard_max
if len(s) <= maxlen:
return s, ""
for p, sep_len in self._patterns:
# -- length of separator must be added to include that separator when it happens to be
# -- located exactly at maxlen. Otherwise the search-from-end regex won't find it.
fragment, remainder = self._split_from_maxlen(p, sep_len, s)
if (
# -- no available split with this separator --
not fragment
# -- split did not progress, consuming part of the string --
or len(remainder) >= len(s)
):
continue
return fragment.rstrip(), remainder.lstrip()
# -- the terminal "" pattern is not actually executed via regex since its implementation is
# -- trivial and provides a hard back-stop here in this method. No separator is used between
# -- tail and remainder on arb-char split.
return s[:maxlen].rstrip(), s[maxlen - self._opts.overlap :].lstrip()
@lazyproperty
def _patterns(self) -> tuple[tuple[regex.Pattern[str], int], ...]:
"""Sequence of (pattern, len) pairs to match against.
Patterns appear in order of preference, those following are "fall-back" patterns to be used
if no match of a prior pattern is found.
NOTE these regexes search *from the end of the string*, which is what the "(?r)" bit
specifies. This is much more efficient than starting at the beginning of the string which
could result in hundreds of matches before the desired one.
"""
separators = self._opts.text_splitting_separators
return tuple((regex.compile(f"(?r){sep}"), len(sep)) for sep in separators)
def _split_from_maxlen(
self, pattern: regex.Pattern[str], sep_len: int, s: str
) -> tuple[str, str]:
"""Return (split, remainder) pair split from `s` on the right-most match before `maxlen`.
Returns `"", s` if no suitable match was found. Also returns `"", s` if splitting on this
separator produces a split shorter than the required overlap (which would produce an
infinite loop).
`split` will never be longer than `maxlen` and there is no longer split available using
`pattern`.
The separator is removed and does not appear in either the split or remainder.
"""
maxlen, overlap = self._opts.hard_max, self._opts.overlap
# -- A split not longer than overlap will not progress (infinite loop). On the right side,
# -- need to extend search range to include a separator located exactly at maxlen.
match = pattern.search(s, pos=overlap + 1, endpos=maxlen + sep_len)
if match is None:
return "", s
# -- characterize match location
match_start, match_end = match.span()
# -- matched separator is replaced by single-space in overlap string --
separator = " "
# -- in multi-space situation, fragment may have trailing whitespace because match is from
# -- right to left
fragment = s[:match_start].rstrip()
# -- remainder can have leading space when match is on "\n" followed by spaces --
raw_remainder = s[match_end:].lstrip()
if overlap <= len(separator):
return fragment, raw_remainder
# -- compute overlap --
tail_len = overlap - len(separator)
tail = fragment[-tail_len:].lstrip()
overlapped_remainder = tail + separator + raw_remainder
return fragment, overlapped_remainder
class _CellAccumulator:
"""Incrementally build `` fragment cell-by-cell to maximally fill chunking window.
Accumulate cells until chunking window is filled, then generate the text and HTML for the
subtable composed of all those rows that fit in the window.
"""
def __init__(self, maxlen: int):
self._maxlen = maxlen
self._cells: list[HtmlCell] = []
def add_cell(self, cell: HtmlCell) -> None:
"""Add `cell` to this accumulation. Caller is responsible for ensuring it will fit."""
self._cells.append(cell)
def flush(self) -> Iterator[TextAndHtml]:
"""Generate zero-or-one (text, html) pairs for accumulated sub-sub-table."""
if not self._cells:
return
text = " ".join(self._iter_cell_texts())
tds_str = "".join(c.html for c in self._cells)
html = f""
self._cells.clear()
yield text, html
def will_fit(self, cell: HtmlCell) -> bool:
"""True when `cell` will fit within remaining space left by accummulated cells."""
return self._remaining_space >= len(cell.html)
def _iter_cell_texts(self) -> Iterator[str]:
"""Generate contents of each accumulated cell as a separate string.
A cell that is empty or contains only whitespace does not generate a string.
"""
for cell in self._cells:
if not (text := cell.text):
continue
yield text
@property
def _remaining_space(self) -> int:
"""Number of characters remaining when accumulated cells are formed into HTML."""
# -- 24 is `len("")`, the overhead in addition to ``
# -- HTML fragments
return self._maxlen - 24 - sum(len(c.html) for c in self._cells)
class _RowAccumulator:
"""Maybe `SubtableAccumulator`.
Accumulate rows until chunking window is filled, then generate the text and HTML for the
subtable composed of all those rows that fit in the window.
"""
def __init__(self, maxlen: int):
self._maxlen = maxlen
self._rows: list[HtmlRow] = []
def add_row(self, row: HtmlRow) -> None:
"""Add `row` to this accumulation. Caller is responsible for ensuring it will fit."""
self._rows.append(row)
def flush(self) -> Iterator[TextAndHtml]:
"""Generate zero-or-one (text, html) pairs for accumulated sub-table."""
if not self._rows:
return
text = " ".join(self._iter_cell_texts())
trs_str = "".join(r.html for r in self._rows)
html = f""
self._rows.clear()
yield text, html
def will_fit(self, row: HtmlRow) -> bool:
"""True when `row` will fit within remaining space left by accummulated rows."""
return self._remaining_space >= len(row.html)
def _iter_cell_texts(self) -> Iterator[str]:
"""Generate contents of each row cell as a separate string.
A cell that is empty or contains only whitespace does not generate a string.
"""
for r in self._rows:
yield from r.iter_cell_texts()
@property
def _remaining_space(self) -> int:
"""Number of characters remaining when accumulated rows are formed into HTML."""
# -- 15 is `len("")`, the overhead in addition to ` | ` HTML fragments --
return self._maxlen - 15 - sum(len(r.html) for r in self._rows)
# ================================================================================================
# PRE-CHUNK COMBINER
# ================================================================================================
class PreChunkCombiner:
"""Filters pre-chunk stream to combine small pre-chunks where possible."""
def __init__(self, pre_chunks: Iterable[PreChunk], opts: ChunkingOptions):
self._pre_chunks = pre_chunks
self._opts = opts
def iter_combined_pre_chunks(self) -> Iterator[PreChunk]:
"""Generate pre-chunk objects, combining TextPreChunk objects when they'll fit in window."""
accum = TextPreChunkAccumulator(self._opts)
for pre_chunk in self._pre_chunks:
# -- a table pre-chunk is never combined --
if isinstance(pre_chunk, TablePreChunk):
yield from accum.flush()
yield pre_chunk
continue
# -- finish accumulating pre-chunk when it's full --
if not accum.will_fit(pre_chunk):
yield from accum.flush()
accum.add_pre_chunk(pre_chunk)
yield from accum.flush()
class TextPreChunkAccumulator:
"""Accumulates, measures, and combines text pre-chunks.
Used for combining pre-chunks for chunking strategies like "by-title" that can potentially
produce undersized chunks and offer the `combine_text_under_n_chars` option. Note that only
sequential `TextPreChunk` objects can be combined. A `TablePreChunk` is never combined with
another pre-chunk.
Provides `.add_pre_chunk()` allowing a pre-chunk to be added to the chunk and provides
monitoring properties `.remaining_space` and `.text_length` suitable for deciding whether to add
another pre-chunk.
`.flush()` is used to combine the accumulated pre-chunks into a single `TextPreChunk` object.
This method returns an interator that generates zero-or-one `TextPreChunk` objects and is used
like so:
yield from accum.flush()
If no pre-chunks have been accumulated, no `TextPreChunk` is generated. Flushing the builder
clears the pre-chunks it contains so it is ready to accept the next text-pre-chunk.
"""
def __init__(self, opts: ChunkingOptions) -> None:
self._opts = opts
self._pre_chunk: TextPreChunk | None = None
def add_pre_chunk(self, pre_chunk: TextPreChunk) -> None:
"""Add a pre-chunk to the accumulator for possible combination with next pre-chunk."""
self._pre_chunk = (
pre_chunk if self._pre_chunk is None else self._pre_chunk.combine(pre_chunk)
)
def flush(self) -> Iterator[TextPreChunk]:
"""Generate accumulated pre-chunk as a single combined pre-chunk.
Does not generate a pre-chunk when none has been accumulated.
"""
# -- nothing to do if no pre-chunk has been accumulated --
if not self._pre_chunk:
return
# -- otherwise generate the combined pre-chunk --
yield self._pre_chunk
# -- and reset the accumulator (to empty) --
self._pre_chunk = None
def will_fit(self, pre_chunk: TextPreChunk) -> bool:
"""True when there is room for `pre_chunk` in accumulator.
An empty accumulator always has room. Otherwise there is only room when `pre_chunk` can be
combined with any other pre-chunks in the accumulator without exceeding the combination
limits specified for the chunking run.
"""
# -- an empty accumulator always has room --
if self._pre_chunk is None:
return True
return self._pre_chunk.can_combine(pre_chunk)
# ================================================================================================
# CHUNK BOUNDARY PREDICATES
# ------------------------------------------------------------------------------------------------
# A *boundary predicate* is a function that takes an element and returns True when the element
# represents the start of a new semantic boundary (such as section or page) to be respected in
# chunking.
#
# Some of the functions below *are* a boundary predicate and others *construct* a boundary
# predicate.
#
# These can be mixed and matched to produce different chunking behaviors like "by_title" or left
# out altogether to produce "by_element" behavior.
#
# The effective lifetime of the function that produce a predicate (rather than directly being one)
# is limited to a single element-stream because these retain state (e.g. current page number) to
# determine when a semantic boundary has been crossed.
# ================================================================================================
def is_on_next_page() -> BoundaryPredicate:
"""Not a predicate itself, calling this returns a predicate that triggers on each new page.
The lifetime of the returned callable cannot extend beyond a single element-stream because it
stores current state (current page-number) that is particular to that element stream.
The returned predicate tracks the "current" page-number, starting at 1. An element with a
greater page number returns True, indicating the element starts a new page boundary, and
updates the enclosed page-number ready for the next transition.
An element with `page_number == None` or a page-number lower than the stored value is ignored
and returns False.
"""
current_page_number: int = 1
is_first: bool = True
def page_number_incremented(element: Element) -> bool:
nonlocal current_page_number, is_first
page_number = element.metadata.page_number
# -- The first element never reports a page break, it starts the first page of the
# -- document. That page could be numbered (page_number is non-None) or not. If it is not
# -- numbered we assign it page-number 1.
if is_first:
current_page_number = page_number or 1
is_first = False
return False
# -- An element with a `None` page-number is assumed to continue the current page. It never
# -- updates the current-page-number because once set, the current-page-number is "sticky"
# -- until replaced by a different explicit page-number.
if page_number is None:
return False
if page_number == current_page_number:
return False
# -- it's possible for a page-number to decrease. We don't expect that, but if it happens
# -- we consider it a page-break.
current_page_number = page_number
return True
return page_number_incremented
def is_title(element: Element) -> bool:
"""True when `element` is a `Title` element, False otherwise."""
return isinstance(element, Title)