rfctr(chunk): prep for adding TableSplitter (#3510)

**Summary**
Mechanical refactoring in preparation for adding (pre-chunk)
`TableSplitter` in a PR stacked on this one.
This commit is contained in:
Steve Canny 2024-08-12 11:04:49 -07:00 committed by GitHub
parent d99b39923d
commit cbe1b35621
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
24 changed files with 674 additions and 564 deletions

View File

@ -1,4 +1,4 @@
## 0.15.2-dev6
## 0.15.2-dev7
### Enhancements

View File

@ -4,7 +4,7 @@
from __future__ import annotations
from typing import Any, Optional, Sequence
from typing import Any, Sequence
import pytest
@ -65,7 +65,7 @@ class DescribeChunkingOptions:
("combine_text_under_n_chars", "expected_value"), [(None, 0), (42, 42)]
)
def it_accepts_combine_text_under_n_chars_in_constructor_but_defaults_to_no_combining(
self, combine_text_under_n_chars: Optional[int], expected_value: int
self, combine_text_under_n_chars: int | None, expected_value: int
):
"""Subclasses can store `combine_text_under_n_chars` but must validate and enable it.
@ -153,107 +153,6 @@ class DescribeChunkingOptions:
assert ChunkingOptions().text_separator == "\n\n"
class Describe_TextSplitter:
"""Unit-test suite for `unstructured.chunking.base._TextSplitter` objects."""
def it_splits_on_a_preferred_separator_when_it_can(self):
opts = ChunkingOptions(max_characters=50, text_splitting_separators=("\n", " "), overlap=10)
split = _TextSplitter(opts)
text = (
"Lorem ipsum dolor amet consectetur adipiscing. \n "
"In rhoncus ipsum sed lectus porta."
)
s, remainder = split(text)
# -- trailing whitespace is stripped from split --
assert s == "Lorem ipsum dolor amet consectetur adipiscing."
# -- leading whitespace is stripped from remainder
# -- overlap is separated by single space
# -- overlap-prefix is computed on arbitrary character boundary
# -- overlap-prefix len includes space separator (text portion is one less than specified)
assert remainder == "ipiscing. In rhoncus ipsum sed lectus porta."
# --
s, remainder = split(remainder)
assert s == "ipiscing. In rhoncus ipsum sed lectus porta."
assert remainder == ""
def and_it_splits_on_the_next_available_separator_when_the_first_is_not_available(self):
opts = ChunkingOptions(max_characters=40, text_splitting_separators=("\n", " "), overlap=10)
split = _TextSplitter(opts)
text = (
"Lorem ipsum dolor amet consectetur adipiscing. In rhoncus ipsum sed lectus porta"
" volutpat."
)
s, remainder = split(text)
assert s == "Lorem ipsum dolor amet consectetur"
assert remainder == "nsectetur adipiscing. In rhoncus ipsum sed lectus porta volutpat."
# --
s, remainder = split(remainder)
assert s == "nsectetur adipiscing. In rhoncus ipsum"
assert remainder == "cus ipsum sed lectus porta volutpat."
# --
s, remainder = split(remainder)
assert s == "cus ipsum sed lectus porta volutpat."
assert remainder == ""
def and_it_splits_on_an_arbitrary_character_as_a_last_resort(self):
opts = ChunkingOptions(max_characters=30, text_splitting_separators=("\n", " "), overlap=10)
split = _TextSplitter(opts)
text = "Loremipsumdolorametconsecteturadipiscingelit. In rhoncus ipsum sed lectus porta."
s, remainder = split(text)
assert s == "Loremipsumdolorametconsectetur"
assert remainder == "onsecteturadipiscingelit. In rhoncus ipsum sed lectus porta."
# --
s, remainder = split(remainder)
assert s == "onsecteturadipiscingelit. In"
assert remainder == "gelit. In rhoncus ipsum sed lectus porta."
# --
s, remainder = split(remainder)
assert s == "gelit. In rhoncus ipsum sed"
assert remainder == "ipsum sed lectus porta."
@pytest.mark.parametrize(
"text",
[
"Lorem ipsum dolor amet consectetur adipiscing.", # 46-chars
"Lorem ipsum dolor.", # 18-chars
],
)
def it_does_not_split_a_string_that_is_not_longer_than_maxlen(self, text: str):
opts = ChunkingOptions(max_characters=46, overlap=10)
split = _TextSplitter(opts)
s, remainder = split(text)
assert s == text
assert remainder == ""
def it_fills_the_window_when_falling_back_to_an_arbitrary_character_split(self):
opts = ChunkingOptions(max_characters=38, overlap=10)
split = _TextSplitter(opts)
text = "Loremipsumdolorametconsecteturadipiscingelit. In rhoncus ipsum sed lectus porta."
s, _ = split(text)
assert s == "Loremipsumdolorametconsecteturadipisci"
assert len(s) == 38
@pytest.mark.parametrize("separators", [("\n", " "), (" ",)])
def it_strips_whitespace_around_the_split(self, separators: Sequence[str]):
opts = ChunkingOptions(max_characters=50, text_splitting_separators=separators, overlap=10)
split = _TextSplitter(opts)
text = "Lorem ipsum dolor amet consectetur adipiscing. \n\n In rhoncus ipsum sed lectus."
# |-------------------------------------------------^ 50-chars
s, remainder = split(text)
assert s == "Lorem ipsum dolor amet consectetur adipiscing."
assert remainder == "ipiscing. In rhoncus ipsum sed lectus."
# ================================================================================================
# PRE-CHUNKER
# ================================================================================================
@ -305,6 +204,169 @@ class DescribePreChunker:
next(pre_chunk_iter)
class DescribePreChunkBuilder:
"""Unit-test suite for `unstructured.chunking.base.PreChunkBuilder`."""
def it_is_empty_on_construction(self):
builder = PreChunkBuilder(opts=ChunkingOptions(max_characters=50))
assert builder._text_length == 0
assert builder._remaining_space == 50
def it_accumulates_elements_added_to_it(self):
builder = PreChunkBuilder(opts=ChunkingOptions(max_characters=150))
builder.add_element(Title("Introduction"))
assert builder._text_length == 12
assert builder._remaining_space == 136
builder.add_element(
Text(
"Lorem ipsum dolor sit amet consectetur adipiscing elit. In rhoncus ipsum sed"
"lectus porta volutpat.",
),
)
assert builder._text_length == 112
assert builder._remaining_space == 36
@pytest.mark.parametrize("element", [Table("Heading\nCell text"), Text("abcd " * 200)])
def it_will_fit_a_Table_or_oversized_element_when_empty(self, element: Element):
builder = PreChunkBuilder(opts=ChunkingOptions())
assert builder.will_fit(element)
@pytest.mark.parametrize(
("existing_element", "next_element"),
[
(Text("abcd"), Table("Fruits\nMango")),
(Text("abcd"), Text("abcd " * 200)),
(Table("Heading\nCell text"), Table("Fruits\nMango")),
(Table("Heading\nCell text"), Text("abcd " * 200)),
],
)
def but_not_when_it_already_contains_an_element_of_any_kind(
self, existing_element: Element, next_element: Element
):
builder = PreChunkBuilder(opts=ChunkingOptions())
builder.add_element(existing_element)
assert not builder.will_fit(next_element)
@pytest.mark.parametrize("element", [Text("abcd"), Table("Fruits\nMango")])
def it_will_not_fit_any_element_when_it_already_contains_a_table(self, element: Element):
builder = PreChunkBuilder(opts=ChunkingOptions())
builder.add_element(Table("Heading\nCell text"))
assert not builder.will_fit(element)
def it_will_not_fit_an_element_when_it_already_exceeds_the_soft_maxlen(self):
builder = PreChunkBuilder(opts=ChunkingOptions(max_characters=100, new_after_n_chars=50))
builder.add_element(
Text("Lorem ipsum dolor sit amet consectetur adipiscing elit.") # 55-chars
)
assert not builder.will_fit(Text("In rhoncus ipsum."))
def and_it_will_not_fit_an_element_when_that_would_cause_it_to_exceed_the_hard_maxlen(self):
builder = PreChunkBuilder(opts=ChunkingOptions(max_characters=100))
builder.add_element(
Text("Lorem ipsum dolor sit amet consectetur adipiscing elit.") # 55-chars
)
# -- 55 + 2 (separator) + 44 == 101 --
assert not builder.will_fit(
Text("In rhoncus ipsum sed lectus portos volutpat.") # 44-chars
)
def but_it_will_fit_an_element_that_fits(self):
builder = PreChunkBuilder(opts=ChunkingOptions(max_characters=100))
builder.add_element(
Text("Lorem ipsum dolor sit amet consectetur adipiscing elit.") # 55-chars
)
# -- 55 + 2 (separator) + 43 == 100 --
assert builder.will_fit(Text("In rhoncus ipsum sed lectus porto volutpat.")) # 43-chars
def it_generates_a_TextPreChunk_when_flushed_and_resets_itself_to_empty(self):
builder = PreChunkBuilder(opts=ChunkingOptions(max_characters=150))
builder.add_element(Title("Introduction"))
builder.add_element(
Text(
"Lorem ipsum dolor sit amet consectetur adipiscing elit. In rhoncus ipsum sed"
"lectus porta volutpat.",
),
)
pre_chunk = next(builder.flush())
assert isinstance(pre_chunk, TextPreChunk)
assert pre_chunk._elements == [
Title("Introduction"),
Text(
"Lorem ipsum dolor sit amet consectetur adipiscing elit. In rhoncus ipsum sed"
"lectus porta volutpat.",
),
]
assert builder._text_length == 0
assert builder._remaining_space == 150
def and_it_generates_a_TablePreChunk_when_it_contains_a_Table_element(self):
builder = PreChunkBuilder(opts=ChunkingOptions(max_characters=150))
builder.add_element(Table("Heading\nCell text"))
pre_chunk = next(builder.flush())
# -- pre-chunk builder was reset before the yield, such that the iterator does not need to
# -- be exhausted before clearing out the old elements and a new pre-chunk can be
# -- accumulated immediately (first `next()` call is required however, to advance to the
# -- yield statement).
assert builder._text_length == 0
assert builder._remaining_space == 150
# -- pre-chunk is a `TablePreChunk` --
assert isinstance(pre_chunk, TablePreChunk)
assert pre_chunk._table == Table("Heading\nCell text")
def but_it_does_not_generate_a_pre_chunk_on_flush_when_empty(self):
builder = PreChunkBuilder(opts=ChunkingOptions(max_characters=150))
pre_chunks = list(builder.flush())
assert pre_chunks == []
assert builder._text_length == 0
assert builder._remaining_space == 150
def it_computes_overlap_from_each_pre_chunk_and_applies_it_to_the_next(self):
opts = ChunkingOptions(overlap=15, overlap_all=True)
builder = PreChunkBuilder(opts=opts)
builder.add_element(Text("Lorem ipsum dolor sit amet consectetur adipiscing elit."))
pre_chunk = list(builder.flush())[0]
assert pre_chunk._text == "Lorem ipsum dolor sit amet consectetur adipiscing elit."
builder.add_element(Table("In rhoncus ipsum sed lectus porta volutpat."))
pre_chunk = list(builder.flush())[0]
assert pre_chunk._text == "dipiscing elit.\nIn rhoncus ipsum sed lectus porta volutpat."
builder.add_element(Text("Donec semper facilisis metus finibus."))
pre_chunk = list(builder.flush())[0]
assert pre_chunk._text == "porta volutpat.\n\nDonec semper facilisis metus finibus."
def it_considers_separator_length_when_computing_text_length_and_remaining_space(self):
builder = PreChunkBuilder(opts=ChunkingOptions(max_characters=50))
builder.add_element(Text("abcde"))
builder.add_element(Text("fghij"))
# -- ._text_length includes a separator ("\n\n", len==2) between each text-segment,
# -- so 5 + 2 + 5 = 12 here, not 5 + 5 = 10
assert builder._text_length == 12
# -- ._remaining_space is reduced by the length (2) of the trailing separator which would
# -- go between the current text and that of the next element if one was added.
# -- So 50 - 12 - 2 = 36 here, not 50 - 12 = 38
assert builder._remaining_space == 36
# ================================================================================================
# PRE-CHUNK SUBTYPES
# ================================================================================================
@ -1032,171 +1094,114 @@ class DescribeTextPreChunk:
# ================================================================================================
# PRE-CHUNKING ACCUMULATORS
# PRE-CHUNK SPLITTERS
# ================================================================================================
class DescribePreChunkBuilder:
"""Unit-test suite for `unstructured.chunking.base.PreChunkBuilder`."""
class Describe_TextSplitter:
"""Unit-test suite for `unstructured.chunking.base._TextSplitter` objects."""
def it_is_empty_on_construction(self):
builder = PreChunkBuilder(opts=ChunkingOptions(max_characters=50))
assert builder._text_length == 0
assert builder._remaining_space == 50
def it_accumulates_elements_added_to_it(self):
builder = PreChunkBuilder(opts=ChunkingOptions(max_characters=150))
builder.add_element(Title("Introduction"))
assert builder._text_length == 12
assert builder._remaining_space == 136
builder.add_element(
Text(
"Lorem ipsum dolor sit amet consectetur adipiscing elit. In rhoncus ipsum sed"
"lectus porta volutpat.",
),
def it_splits_on_a_preferred_separator_when_it_can(self):
opts = ChunkingOptions(max_characters=50, text_splitting_separators=("\n", " "), overlap=10)
split = _TextSplitter(opts)
text = (
"Lorem ipsum dolor amet consectetur adipiscing. \n "
"In rhoncus ipsum sed lectus porta."
)
assert builder._text_length == 112
assert builder._remaining_space == 36
@pytest.mark.parametrize("element", [Table("Heading\nCell text"), Text("abcd " * 200)])
def it_will_fit_a_Table_or_oversized_element_when_empty(self, element: Element):
builder = PreChunkBuilder(opts=ChunkingOptions())
assert builder.will_fit(element)
s, remainder = split(text)
# -- trailing whitespace is stripped from split --
assert s == "Lorem ipsum dolor amet consectetur adipiscing."
# -- leading whitespace is stripped from remainder
# -- overlap is separated by single space
# -- overlap-prefix is computed on arbitrary character boundary
# -- overlap-prefix len includes space separator (text portion is one less than specified)
assert remainder == "ipiscing. In rhoncus ipsum sed lectus porta."
# --
s, remainder = split(remainder)
assert s == "ipiscing. In rhoncus ipsum sed lectus porta."
assert remainder == ""
def and_it_splits_on_the_next_available_separator_when_the_first_is_not_available(self):
opts = ChunkingOptions(max_characters=40, text_splitting_separators=("\n", " "), overlap=10)
split = _TextSplitter(opts)
text = (
"Lorem ipsum dolor amet consectetur adipiscing. In rhoncus ipsum sed lectus porta"
" volutpat."
)
s, remainder = split(text)
assert s == "Lorem ipsum dolor amet consectetur"
assert remainder == "nsectetur adipiscing. In rhoncus ipsum sed lectus porta volutpat."
# --
s, remainder = split(remainder)
assert s == "nsectetur adipiscing. In rhoncus ipsum"
assert remainder == "cus ipsum sed lectus porta volutpat."
# --
s, remainder = split(remainder)
assert s == "cus ipsum sed lectus porta volutpat."
assert remainder == ""
def and_it_splits_on_an_arbitrary_character_as_a_last_resort(self):
opts = ChunkingOptions(max_characters=30, text_splitting_separators=("\n", " "), overlap=10)
split = _TextSplitter(opts)
text = "Loremipsumdolorametconsecteturadipiscingelit. In rhoncus ipsum sed lectus porta."
s, remainder = split(text)
assert s == "Loremipsumdolorametconsectetur"
assert remainder == "onsecteturadipiscingelit. In rhoncus ipsum sed lectus porta."
# --
s, remainder = split(remainder)
assert s == "onsecteturadipiscingelit. In"
assert remainder == "gelit. In rhoncus ipsum sed lectus porta."
# --
s, remainder = split(remainder)
assert s == "gelit. In rhoncus ipsum sed"
assert remainder == "ipsum sed lectus porta."
@pytest.mark.parametrize(
("existing_element", "next_element"),
"text",
[
(Text("abcd"), Table("Fruits\nMango")),
(Text("abcd"), Text("abcd " * 200)),
(Table("Heading\nCell text"), Table("Fruits\nMango")),
(Table("Heading\nCell text"), Text("abcd " * 200)),
"Lorem ipsum dolor amet consectetur adipiscing.", # 46-chars
"Lorem ipsum dolor.", # 18-chars
],
)
def but_not_when_it_already_contains_an_element_of_any_kind(
self, existing_element: Element, next_element: Element
):
builder = PreChunkBuilder(opts=ChunkingOptions())
builder.add_element(existing_element)
def it_does_not_split_a_string_that_is_not_longer_than_maxlen(self, text: str):
opts = ChunkingOptions(max_characters=46, overlap=10)
split = _TextSplitter(opts)
assert not builder.will_fit(next_element)
s, remainder = split(text)
@pytest.mark.parametrize("element", [Text("abcd"), Table("Fruits\nMango")])
def it_will_not_fit_any_element_when_it_already_contains_a_table(self, element: Element):
builder = PreChunkBuilder(opts=ChunkingOptions())
builder.add_element(Table("Heading\nCell text"))
assert s == text
assert remainder == ""
assert not builder.will_fit(element)
def it_fills_the_window_when_falling_back_to_an_arbitrary_character_split(self):
opts = ChunkingOptions(max_characters=38, overlap=10)
split = _TextSplitter(opts)
text = "Loremipsumdolorametconsecteturadipiscingelit. In rhoncus ipsum sed lectus porta."
def it_will_not_fit_an_element_when_it_already_exceeds_the_soft_maxlen(self):
builder = PreChunkBuilder(opts=ChunkingOptions(max_characters=100, new_after_n_chars=50))
builder.add_element(
Text("Lorem ipsum dolor sit amet consectetur adipiscing elit.") # 55-chars
)
s, _ = split(text)
assert not builder.will_fit(Text("In rhoncus ipsum."))
assert s == "Loremipsumdolorametconsecteturadipisci"
assert len(s) == 38
def and_it_will_not_fit_an_element_when_that_would_cause_it_to_exceed_the_hard_maxlen(self):
builder = PreChunkBuilder(opts=ChunkingOptions(max_characters=100))
builder.add_element(
Text("Lorem ipsum dolor sit amet consectetur adipiscing elit.") # 55-chars
)
@pytest.mark.parametrize("separators", [("\n", " "), (" ",)])
def it_strips_whitespace_around_the_split(self, separators: Sequence[str]):
opts = ChunkingOptions(max_characters=50, text_splitting_separators=separators, overlap=10)
split = _TextSplitter(opts)
text = "Lorem ipsum dolor amet consectetur adipiscing. \n\n In rhoncus ipsum sed lectus."
# |-------------------------------------------------^ 50-chars
# -- 55 + 2 (separator) + 44 == 101 --
assert not builder.will_fit(
Text("In rhoncus ipsum sed lectus portos volutpat.") # 44-chars
)
s, remainder = split(text)
def but_it_will_fit_an_element_that_fits(self):
builder = PreChunkBuilder(opts=ChunkingOptions(max_characters=100))
builder.add_element(
Text("Lorem ipsum dolor sit amet consectetur adipiscing elit.") # 55-chars
)
assert s == "Lorem ipsum dolor amet consectetur adipiscing."
assert remainder == "ipiscing. In rhoncus ipsum sed lectus."
# -- 55 + 2 (separator) + 43 == 100 --
assert builder.will_fit(Text("In rhoncus ipsum sed lectus porto volutpat.")) # 43-chars
def it_generates_a_TextPreChunk_when_flushed_and_resets_itself_to_empty(self):
builder = PreChunkBuilder(opts=ChunkingOptions(max_characters=150))
builder.add_element(Title("Introduction"))
builder.add_element(
Text(
"Lorem ipsum dolor sit amet consectetur adipiscing elit. In rhoncus ipsum sed"
"lectus porta volutpat.",
),
)
pre_chunk = next(builder.flush())
assert isinstance(pre_chunk, TextPreChunk)
assert pre_chunk._elements == [
Title("Introduction"),
Text(
"Lorem ipsum dolor sit amet consectetur adipiscing elit. In rhoncus ipsum sed"
"lectus porta volutpat.",
),
]
assert builder._text_length == 0
assert builder._remaining_space == 150
def and_it_generates_a_TablePreChunk_when_it_contains_a_Table_element(self):
builder = PreChunkBuilder(opts=ChunkingOptions(max_characters=150))
builder.add_element(Table("Heading\nCell text"))
pre_chunk = next(builder.flush())
# -- pre-chunk builder was reset before the yield, such that the iterator does not need to
# -- be exhausted before clearing out the old elements and a new pre-chunk can be
# -- accumulated immediately (first `next()` call is required however, to advance to the
# -- yield statement).
assert builder._text_length == 0
assert builder._remaining_space == 150
# -- pre-chunk is a `TablePreChunk` --
assert isinstance(pre_chunk, TablePreChunk)
assert pre_chunk._table == Table("Heading\nCell text")
def but_it_does_not_generate_a_pre_chunk_on_flush_when_empty(self):
builder = PreChunkBuilder(opts=ChunkingOptions(max_characters=150))
pre_chunks = list(builder.flush())
assert pre_chunks == []
assert builder._text_length == 0
assert builder._remaining_space == 150
def it_computes_overlap_from_each_pre_chunk_and_applies_it_to_the_next(self):
opts = ChunkingOptions(overlap=15, overlap_all=True)
builder = PreChunkBuilder(opts=opts)
builder.add_element(Text("Lorem ipsum dolor sit amet consectetur adipiscing elit."))
pre_chunk = list(builder.flush())[0]
assert pre_chunk._text == "Lorem ipsum dolor sit amet consectetur adipiscing elit."
builder.add_element(Table("In rhoncus ipsum sed lectus porta volutpat."))
pre_chunk = list(builder.flush())[0]
assert pre_chunk._text == "dipiscing elit.\nIn rhoncus ipsum sed lectus porta volutpat."
builder.add_element(Text("Donec semper facilisis metus finibus."))
pre_chunk = list(builder.flush())[0]
assert pre_chunk._text == "porta volutpat.\n\nDonec semper facilisis metus finibus."
def it_considers_separator_length_when_computing_text_length_and_remaining_space(self):
builder = PreChunkBuilder(opts=ChunkingOptions(max_characters=50))
builder.add_element(Text("abcde"))
builder.add_element(Text("fghij"))
# -- ._text_length includes a separator ("\n\n", len==2) between each text-segment,
# -- so 5 + 2 + 5 = 12 here, not 5 + 5 = 10
assert builder._text_length == 12
# -- ._remaining_space is reduced by the length (2) of the trailing separator which would
# -- go between the current text and that of the next element if one was added.
# -- So 50 - 12 - 2 = 36 here, not 50 - 12 = 38
assert builder._remaining_space == 36
# ================================================================================================
# PRE-CHUNK COMBINER
# ================================================================================================
class DescribePreChunkCombiner:

View File

View File

@ -0,0 +1,33 @@
"""Unit-test suite for the `unstructured.common.html_table` module."""
from __future__ import annotations
from unstructured.common.html_table import htmlify_matrix_of_cell_texts
class Describe_htmlify_matrix_of_cell_texts:
"""Unit-test suite for `unstructured.common.html_table.htmlify_matrix_of_cell_texts()`."""
def test_htmlify_matrix_handles_empty_cells(self):
assert htmlify_matrix_of_cell_texts([["cell1", "", "cell3"], ["", "cell5", ""]]) == (
"<table>"
"<tr><td>cell1</td><td></td><td>cell3</td></tr>"
"<tr><td></td><td>cell5</td><td></td></tr>"
"</table>"
)
def test_htmlify_matrix_handles_special_characters(self):
assert htmlify_matrix_of_cell_texts([['<>&"', "newline\n"]]) == (
"<table><tr><td>&lt;&gt;&amp;&quot;</td><td>newline<br/></td></tr></table>"
)
def test_htmlify_matrix_handles_multiple_rows_and_cells(self):
assert htmlify_matrix_of_cell_texts([["cell1", "cell2"], ["cell3", "cell4"]]) == (
"<table>"
"<tr><td>cell1</td><td>cell2</td></tr>"
"<tr><td>cell3</td><td>cell4</td></tr>"
"</table>"
)
def test_htmlify_matrix_handles_empty_matrix(self):
assert htmlify_matrix_of_cell_texts([]) == ""

View File

@ -339,30 +339,6 @@ def test_validate_date_args_raises_for_invalid_formats(date):
assert utils.validate_date_args(date)
def test_htmlify_matrix_handles_empty_cells():
assert utils.htmlify_matrix_of_cell_texts([["cell1", "", "cell3"], ["", "cell5", ""]]) == (
"<table><tr><td>cell1</td><td></td><td>cell3</td></tr>"
"<tr><td></td><td>cell5</td><td></td></tr></table>"
)
def test_htmlify_matrix_handles_special_characters():
assert utils.htmlify_matrix_of_cell_texts([['<>&"', "newline\n"]]) == (
"<table><tr><td>&lt;&gt;&amp;&quot;</td><td>newline<br/></td></tr></table>"
)
def test_htmlify_matrix_handles_multiple_rows_and_cells():
assert utils.htmlify_matrix_of_cell_texts([["cell1", "cell2"], ["cell3", "cell4"]]) == (
"<table><tr><td>cell1</td><td>cell2</td></tr>"
"<tr><td>cell3</td><td>cell4</td></tr></table>"
)
def test_htmlify_matrix_handles_empty_matrix():
assert utils.htmlify_matrix_of_cell_texts([]) == ""
def test_only_returns_singleton_iterable():
singleton_iterable = [42]
result = utils.only(singleton_iterable)

View File

@ -6,7 +6,7 @@ from typing import Any, Callable, Collection, Protocol, TypeVar
from typing_extensions import TypeAlias
from .etree import QName, _Element, _ElementTree
from .etree import HTMLParser, QName, XMLParser, _Element, _ElementTree
_ET = TypeVar("_ET", bound=_Element, default=_Element)
_ET_co = TypeVar("_ET_co", bound=_Element, default=_Element, covariant=True)
@ -30,5 +30,8 @@ _TextArg: TypeAlias = str | bytes | QName
_XPathObject = Any
# The basic parsers bundled in lxml.etree
_DefEtreeParsers = XMLParser[_ET_co] | HTMLParser[_ET_co]
class SupportsLaxedItems(Protocol[_KT_co, _VT_co]):
def items(self) -> Collection[tuple[_KT_co, _VT_co]]: ...

View File

@ -2,11 +2,12 @@
from __future__ import annotations
from typing import Collection, Generic, Iterator, TypeVar, overload
from typing import Collection, Generic, Iterable, Iterator, TypeVar, overload
from typing_extensions import Self
from .. import _types as _t
from ._module_misc import CDATA, QName
_T = TypeVar("_T")
@ -23,6 +24,12 @@ class _Element:
def get(self, key: _t._AttrName) -> str | None: ...
@overload
def get(self, key: _t._AttrName, default: _T) -> str | _T: ...
@overload
def iter(self, *tags: _t._TagSelector) -> Iterator[Self]: ...
@overload
def iter(
self, *, tag: _t._TagSelector | Iterable[_t._TagSelector] | None = None
) -> Iterator[Self]: ...
def iterancestors(
self, *, tag: _t._TagSelector | Collection[_t._TagSelector] | None = None
) -> Iterator[Self]: ...
@ -39,8 +46,12 @@ class _Element:
def tag(self) -> str: ...
@property
def tail(self) -> str | None: ...
@tail.setter
def tail(self, value: str | CDATA | None) -> None: ...
@property
def text(self) -> str | None: ...
@text.setter
def text(self, value: str | QName | CDATA | None) -> None: ...
def xpath(
self,
_path: str,

View File

@ -2,4 +2,7 @@
from __future__ import annotations
class CDATA:
def __init__(self, data: str) -> None: ...
class QName: ...

View File

@ -1,8 +1,16 @@
# pyright: reportPrivateUsage=false
from __future__ import annotations
from typing import Generic
from .._types import _ET_co
from ._classlookup import ElementClassLookup
class HTMLParser:
# Includes most stuff in _BaseParser
class _FeedParser(Generic[_ET_co]): ...
class HTMLParser(_FeedParser[_ET_co]):
def __init__(
self,
*,
@ -20,7 +28,7 @@ class HTMLParser:
) -> None: ...
def set_element_class_lookup(self, lookup: ElementClassLookup | None = None) -> None: ...
class XMLParser:
class XMLParser(_FeedParser[_ET_co]):
def __init__(
self,
*,

View File

@ -0,0 +1,8 @@
from __future__ import annotations
from ._element import (
HtmlElement as HtmlElement,
)
from ._parse import (
fragment_fromstring as fragment_fromstring,
)

View File

@ -0,0 +1,20 @@
# pyright: reportPrivateUsage=false
from __future__ import annotations
from typing import TYPE_CHECKING
from .._types import _DefEtreeParsers
from ._element import HtmlElement
if TYPE_CHECKING:
from typing_extensions import TypeAlias
_HtmlElemParser: TypeAlias = _DefEtreeParsers[HtmlElement]
def fragment_fromstring(
html: str,
create_parent: bool = False,
base_url: str | None = None,
parser: _HtmlElemParser | None = None,
) -> HtmlElement: ...

View File

@ -1,8 +1,5 @@
from __future__ import annotations
from pandas.core.api import (
DataFrame as DataFrame,
)
from pandas.io.api import (
read_csv as read_csv,
)
from pandas.core.api import DataFrame as DataFrame
from pandas.io.api import read_csv as read_csv
from pandas.io.api import read_excel as read_excel

View File

@ -0,0 +1,20 @@
from __future__ import annotations
from os import PathLike
from typing import Protocol, TypeVar
from typing_extensions import TypeAlias
AnyStr_cov = TypeVar("AnyStr_cov", str, bytes, covariant=True)
FilePath: TypeAlias = str | PathLike[str]
S1 = TypeVar("S1")
class BaseBuffer(Protocol):
@property
def mode(self) -> str: ...
def seek(self, __offset: int, __whence: int = ...) -> int: ...
def seekable(self) -> bool: ...
def tell(self) -> int: ...
class ReadBuffer(BaseBuffer, Protocol[AnyStr_cov]):
def read(self, __n: int = ...) -> AnyStr_cov: ...

View File

@ -1,9 +1,29 @@
# pyright: reportPrivateUsage=false
from __future__ import annotations
from typing import Any, Hashable, Iterable
from pandas.core.indexing import _iLocIndexer
from pandas.core.series import Series
class DataFrame:
def __getitem__(self, key: Iterable[Hashable] | slice) -> DataFrame: ...
def __len__(self) -> int: ...
@property
def T(self) -> DataFrame: ...
@property
def iloc(self) -> _iLocIndexerFrame: ...
def isna(self) -> DataFrame: ...
def iterrows(self) -> Iterable[tuple[Hashable, Series[Any]]]: ...
@property
def shape(self) -> tuple[int, int]: ...
def to_html(
self,
index: bool = ...,
header: bool = ...,
na_rep: str = ...,
) -> str: ...
class _iLocIndexerFrame(_iLocIndexer):
def __getitem__(self, idx: Any) -> DataFrame: ...

View File

@ -1,5 +1,4 @@
from __future__ import annotations
from pandas.io.parsers import (
read_csv as read_csv,
)
from pandas.io.excel import read_excel as read_excel
from pandas.io.parsers import read_csv as read_csv

View File

@ -0,0 +1 @@
from pandas.io.excel._base import read_excel as read_excel

View File

@ -0,0 +1,13 @@
from __future__ import annotations
from typing import Sequence
from pandas._typing import FilePath, ReadBuffer
from pandas.core.frame import DataFrame
def read_excel(
io: FilePath | ReadBuffer[bytes],
sheet_name: None,
*,
header: int | Sequence[int] | None = ...,
) -> dict[str, DataFrame]: ...

View File

@ -1 +1 @@
__version__ = "0.15.2-dev6" # pragma: no cover
__version__ = "0.15.2-dev7" # pragma: no cover

View File

@ -4,7 +4,7 @@ from __future__ import annotations
import collections
import copy
from typing import Any, Callable, DefaultDict, Iterable, Iterator, Optional, cast
from typing import Any, Callable, DefaultDict, Iterable, Iterator, cast
import regex
from typing_extensions import Self, TypeAlias
@ -21,14 +21,16 @@ from unstructured.documents.elements import (
)
from unstructured.utils import lazyproperty
# -- CONSTANTS -----------------------------------
# ================================================================================================
# MODEL
# ================================================================================================
CHUNK_MAX_CHARS_DEFAULT: int = 500
"""Hard-max chunk-length when no explicit value specified in `max_characters` argument.
Provided for reference only, for example so the ingest CLI can advertise the default value in its
UI. External chunking-related functions (e.g. in ingest or decorators) should use
`max_characters: Optional[int] = None` and not apply this default themselves. Only
`max_characters: int | None = None` and not apply this default themselves. Only
`ChunkingOptions.max_characters` should apply a default value.
"""
@ -38,9 +40,6 @@ CHUNK_MULTI_PAGE_DEFAULT: bool = True
Only operative for "by_title" chunking strategy.
"""
# -- TYPES ---------------------------------------
BoundaryPredicate: TypeAlias = Callable[[Element], bool]
"""Detects when element represents crossing a semantic boundary like section or page."""
@ -237,122 +236,6 @@ class ChunkingOptions:
)
class _TextSplitter:
"""Provides a text-splitting function configured on construction.
Text is split on the best-available separator, falling-back from the preferred separator
through a sequence of alternate separators.
- The separator is removed by splitting so only whitespace strings are suitable separators.
- A "blank-line" ("\n\n") is unlikely to occur in an element as it would have been used as an
element boundary during partitioning.
This is a *callable* object. Constructing it essentially produces a function:
split = _TextSplitter(opts)
fragment, remainder = split(s)
This allows it to be configured with length-options etc. on construction and used throughout a
chunking operation on a given element-stream.
"""
def __init__(self, opts: ChunkingOptions):
self._opts = opts
def __call__(self, s: str) -> tuple[str, str]:
"""Return pair of strings split from `s` on the best match of configured patterns.
The first string is the split, the second is the remainder of the string. The split string
will never be longer than `maxlen`. The separators are tried in order until a match is
found. The last separator is "" which matches between any two characters so there will
always be a split.
The separator is removed and does not appear in the split or remainder.
An `s` that is already less than the maximum length is returned unchanged with no remainder.
This allows this function to be called repeatedly with the remainder until it is consumed
and returns a remainder of "".
"""
maxlen = self._opts.hard_max
if len(s) <= maxlen:
return s, ""
for p, sep_len in self._patterns:
# -- length of separator must be added to include that separator when it happens to be
# -- located exactly at maxlen. Otherwise the search-from-end regex won't find it.
fragment, remainder = self._split_from_maxlen(p, sep_len, s)
if (
# -- no available split with this separator --
not fragment
# -- split did not progress, consuming part of the string --
or len(remainder) >= len(s)
):
continue
return fragment.rstrip(), remainder.lstrip()
# -- the terminal "" pattern is not actually executed via regex since its implementation is
# -- trivial and provides a hard back-stop here in this method. No separator is used between
# -- tail and remainder on arb-char split.
return s[:maxlen].rstrip(), s[maxlen - self._opts.overlap :].lstrip()
@lazyproperty
def _patterns(self) -> tuple[tuple[regex.Pattern[str], int], ...]:
"""Sequence of (pattern, len) pairs to match against.
Patterns appear in order of preference, those following are "fall-back" patterns to be used
if no match of a prior pattern is found.
NOTE these regexes search *from the end of the string*, which is what the "(?r)" bit
specifies. This is much more efficient than starting at the beginning of the string which
could result in hundreds of matches before the desired one.
"""
separators = self._opts.text_splitting_separators
return tuple((regex.compile(f"(?r){sep}"), len(sep)) for sep in separators)
def _split_from_maxlen(
self, pattern: regex.Pattern[str], sep_len: int, s: str
) -> tuple[str, str]:
"""Return (split, remainder) pair split from `s` on the right-most match before `maxlen`.
Returns `"", s` if no suitable match was found. Also returns `"", s` if splitting on this
separator produces a split shorter than the required overlap (which would produce an
infinite loop).
`split` will never be longer than `maxlen` and there is no longer split available using
`pattern`.
The separator is removed and does not appear in either the split or remainder.
"""
maxlen, overlap = self._opts.hard_max, self._opts.overlap
# -- A split not longer than overlap will not progress (infinite loop). On the right side,
# -- need to extend search range to include a separator located exactly at maxlen.
match = pattern.search(s, pos=overlap + 1, endpos=maxlen + sep_len)
if match is None:
return "", s
# -- characterize match location
match_start, match_end = match.span()
# -- matched separator is replaced by single-space in overlap string --
separator = " "
# -- in multi-space situation, fragment may have trailing whitespace because match is from
# -- right to left
fragment = s[:match_start].rstrip()
# -- remainder can have leading space when match is on "\n" followed by spaces --
raw_remainder = s[match_end:].lstrip()
if overlap <= len(separator):
return fragment, raw_remainder
# -- compute overlap --
tail_len = overlap - len(separator)
tail = fragment[-tail_len:].lstrip()
overlapped_remainder = tail + separator + raw_remainder
return fragment, overlapped_remainder
# ================================================================================================
# PRE-CHUNKER
# ================================================================================================
@ -428,6 +311,121 @@ class PreChunker:
return any(semantic_boundaries)
class PreChunkBuilder:
"""An element accumulator suitable for incrementally forming a pre-chunk.
Provides the trial method `.will_fit()` a pre-chunker can use to determine whether it should add
the next element in the element stream.
`.flush()` is used to build a PreChunk object from the accumulated elements. This method
returns an iterator that generates zero-or-one `TextPreChunk` or `TablePreChunk` object and is
used like so:
yield from builder.flush()
If no elements have been accumulated, no `PreChunk` instance is generated. Flushing the builder
clears the elements it contains so it is ready to build the next pre-chunk.
"""
def __init__(self, opts: ChunkingOptions) -> None:
self._opts = opts
self._separator_len = len(opts.text_separator)
self._elements: list[Element] = []
# -- overlap is only between pre-chunks so starts empty --
self._overlap_prefix: str = ""
# -- only includes non-empty element text, e.g. PageBreak.text=="" is not included --
self._text_segments: list[str] = []
# -- combined length of text-segments, not including separators --
self._text_len: int = 0
def add_element(self, element: Element) -> None:
"""Add `element` to this section."""
self._elements.append(element)
if element.text:
self._text_segments.append(element.text)
self._text_len += len(element.text)
def flush(self) -> Iterator[PreChunk]:
"""Generate zero-or-one `PreChunk` object and clear the accumulator.
Suitable for use to emit a PreChunk when the maximum size has been reached or a semantic
boundary has been reached. Also to clear out a terminal pre-chunk at the end of an element
stream.
"""
if not self._elements:
return
pre_chunk = (
TablePreChunk(self._elements[0], self._overlap_prefix, self._opts)
if isinstance(self._elements[0], Table)
# -- copy list, don't use original or it may change contents as builder proceeds --
else TextPreChunk(list(self._elements), self._overlap_prefix, self._opts)
)
# -- clear builder before yield so we're not sensitive to the timing of how/when this
# -- iterator is exhausted and can add elements for the next pre-chunk immediately.
self._reset_state(pre_chunk.overlap_tail)
yield pre_chunk
def will_fit(self, element: Element) -> bool:
"""True when `element` can be added to this prechunk without violating its limits.
There are several limits:
- A `Table` element will never fit with any other element. It will only fit in an empty
pre-chunk.
- No element will fit in a pre-chunk that already contains a `Table` element.
- A text-element will not fit in a pre-chunk that already exceeds the soft-max
(aka. new_after_n_chars).
- A text-element will not fit when together with the elements already present it would
exceed the hard-max (aka. max_characters).
"""
# -- an empty pre-chunk will accept any element (including an oversized-element) --
if len(self._elements) == 0:
return True
# -- a `Table` will not fit in a non-empty pre-chunk --
if isinstance(element, Table):
return False
# -- no element will fit in a pre-chunk that already contains a `Table` element --
if isinstance(self._elements[0], Table):
return False
# -- a pre-chunk that already exceeds the soft-max is considered "full" --
if self._text_length > self._opts.soft_max:
return False
# -- don't add an element if it would increase total size beyond the hard-max --
return not self._remaining_space < len(element.text)
@property
def _remaining_space(self) -> int:
"""Maximum text-length of an element that can be added without exceeding maxlen."""
# -- include length of trailing separator that will go before next element text --
separators_len = self._separator_len * len(self._text_segments)
return self._opts.hard_max - self._text_len - separators_len
def _reset_state(self, overlap_prefix: str) -> None:
"""Set working-state values back to "empty", ready to accumulate next pre-chunk."""
self._overlap_prefix = overlap_prefix
self._elements.clear()
self._text_segments = [overlap_prefix] if overlap_prefix else []
self._text_len = len(overlap_prefix)
@property
def _text_length(self) -> int:
"""Length of the text in this pre-chunk.
This value represents the chunk-size that would result if this pre-chunk was flushed in its
current state. In particular, it does not include the length of a trailing separator (since
that would only appear if an additional element was added).
Not suitable for judging remaining space, use `.remaining_space` for that value.
"""
# -- number of text separators present in joined text of elements. This includes only
# -- separators *between* text segments, not one at the end. Note there are zero separators
# -- for both 0 and 1 text-segments.
n = len(self._text_segments)
separator_count = n - 1 if n else 0
return self._text_len + (separator_count * self._separator_len)
# ================================================================================================
# PRE-CHUNK SUB-TYPES
# ================================================================================================
@ -793,126 +791,129 @@ class TextPreChunk:
# ================================================================================================
# PRE-CHUNKING ACCUMULATORS
# ------------------------------------------------------------------------------------------------
# Accumulators encapsulate the work of grouping elements and later pre-chunks to form the larger
# pre-chunk and combined-pre-chunk items central to unstructured chunking.
# PRE-CHUNK SPLITTERS
# ================================================================================================
class PreChunkBuilder:
"""An element accumulator suitable for incrementally forming a pre-chunk.
class _TextSplitter:
"""Provides a text-splitting function configured on construction.
Provides the trial method `.will_fit()` a pre-chunker can use to determine whether it should add
the next element in the element stream.
Text is split on the best-available separator, falling-back from the preferred separator
through a sequence of alternate separators.
`.flush()` is used to build a PreChunk object from the accumulated elements. This method
returns an iterator that generates zero-or-one `TextPreChunk` or `TablePreChunk` object and is
used like so:
- The separator is removed by splitting so only whitespace strings are suitable separators.
- A "blank-line" ("\n\n") is unlikely to occur in an element as it would have been used as an
element boundary during partitioning.
yield from builder.flush()
This is a *callable* object. Constructing it essentially produces a function:
If no elements have been accumulated, no `PreChunk` instance is generated. Flushing the builder
clears the elements it contains so it is ready to build the next pre-chunk.
split = _TextSplitter(opts)
fragment, remainder = split(s)
This allows it to be configured with length-options etc. on construction and used throughout a
chunking operation on a given element-stream.
"""
def __init__(self, opts: ChunkingOptions) -> None:
def __init__(self, opts: ChunkingOptions):
self._opts = opts
self._separator_len = len(opts.text_separator)
self._elements: list[Element] = []
# -- overlap is only between pre-chunks so starts empty --
self._overlap_prefix: str = ""
# -- only includes non-empty element text, e.g. PageBreak.text=="" is not included --
self._text_segments: list[str] = []
# -- combined length of text-segments, not including separators --
self._text_len: int = 0
def __call__(self, s: str) -> tuple[str, str]:
"""Return pair of strings split from `s` on the best match of configured patterns.
def add_element(self, element: Element) -> None:
"""Add `element` to this section."""
self._elements.append(element)
if element.text:
self._text_segments.append(element.text)
self._text_len += len(element.text)
The first string is the split, the second is the remainder of the string. The split string
will never be longer than `maxlen`. The separators are tried in order until a match is
found. The last separator is "" which matches between any two characters so there will
always be a split.
def flush(self) -> Iterator[PreChunk]:
"""Generate zero-or-one `PreChunk` object and clear the accumulator.
The separator is removed and does not appear in the split or remainder.
Suitable for use to emit a PreChunk when the maximum size has been reached or a semantic
boundary has been reached. Also to clear out a terminal pre-chunk at the end of an element
stream.
An `s` that is already less than the maximum length is returned unchanged with no remainder.
This allows this function to be called repeatedly with the remainder until it is consumed
and returns a remainder of "".
"""
if not self._elements:
return
maxlen = self._opts.hard_max
pre_chunk = (
TablePreChunk(self._elements[0], self._overlap_prefix, self._opts)
if isinstance(self._elements[0], Table)
# -- copy list, don't use original or it may change contents as builder proceeds --
else TextPreChunk(list(self._elements), self._overlap_prefix, self._opts)
)
# -- clear builder before yield so we're not sensitive to the timing of how/when this
# -- iterator is exhausted and can add elements for the next pre-chunk immediately.
self._reset_state(pre_chunk.overlap_tail)
yield pre_chunk
if len(s) <= maxlen:
return s, ""
def will_fit(self, element: Element) -> bool:
"""True when `element` can be added to this prechunk without violating its limits.
for p, sep_len in self._patterns:
# -- length of separator must be added to include that separator when it happens to be
# -- located exactly at maxlen. Otherwise the search-from-end regex won't find it.
fragment, remainder = self._split_from_maxlen(p, sep_len, s)
if (
# -- no available split with this separator --
not fragment
# -- split did not progress, consuming part of the string --
or len(remainder) >= len(s)
):
continue
return fragment.rstrip(), remainder.lstrip()
There are several limits:
- A `Table` element will never fit with any other element. It will only fit in an empty
pre-chunk.
- No element will fit in a pre-chunk that already contains a `Table` element.
- A text-element will not fit in a pre-chunk that already exceeds the soft-max
(aka. new_after_n_chars).
- A text-element will not fit when together with the elements already present it would
exceed the hard-max (aka. max_characters).
# -- the terminal "" pattern is not actually executed via regex since its implementation is
# -- trivial and provides a hard back-stop here in this method. No separator is used between
# -- tail and remainder on arb-char split.
return s[:maxlen].rstrip(), s[maxlen - self._opts.overlap :].lstrip()
@lazyproperty
def _patterns(self) -> tuple[tuple[regex.Pattern[str], int], ...]:
"""Sequence of (pattern, len) pairs to match against.
Patterns appear in order of preference, those following are "fall-back" patterns to be used
if no match of a prior pattern is found.
NOTE these regexes search *from the end of the string*, which is what the "(?r)" bit
specifies. This is much more efficient than starting at the beginning of the string which
could result in hundreds of matches before the desired one.
"""
# -- an empty pre-chunk will accept any element (including an oversized-element) --
if len(self._elements) == 0:
return True
# -- a `Table` will not fit in a non-empty pre-chunk --
if isinstance(element, Table):
return False
# -- no element will fit in a pre-chunk that already contains a `Table` element --
if isinstance(self._elements[0], Table):
return False
# -- a pre-chunk that already exceeds the soft-max is considered "full" --
if self._text_length > self._opts.soft_max:
return False
# -- don't add an element if it would increase total size beyond the hard-max --
return not self._remaining_space < len(element.text)
separators = self._opts.text_splitting_separators
return tuple((regex.compile(f"(?r){sep}"), len(sep)) for sep in separators)
@property
def _remaining_space(self) -> int:
"""Maximum text-length of an element that can be added without exceeding maxlen."""
# -- include length of trailing separator that will go before next element text --
separators_len = self._separator_len * len(self._text_segments)
return self._opts.hard_max - self._text_len - separators_len
def _split_from_maxlen(
self, pattern: regex.Pattern[str], sep_len: int, s: str
) -> tuple[str, str]:
"""Return (split, remainder) pair split from `s` on the right-most match before `maxlen`.
def _reset_state(self, overlap_prefix: str) -> None:
"""Set working-state values back to "empty", ready to accumulate next pre-chunk."""
self._overlap_prefix = overlap_prefix
self._elements.clear()
self._text_segments = [overlap_prefix] if overlap_prefix else []
self._text_len = len(overlap_prefix)
Returns `"", s` if no suitable match was found. Also returns `"", s` if splitting on this
separator produces a split shorter than the required overlap (which would produce an
infinite loop).
@property
def _text_length(self) -> int:
"""Length of the text in this pre-chunk.
`split` will never be longer than `maxlen` and there is no longer split available using
`pattern`.
This value represents the chunk-size that would result if this pre-chunk was flushed in its
current state. In particular, it does not include the length of a trailing separator (since
that would only appear if an additional element was added).
Not suitable for judging remaining space, use `.remaining_space` for that value.
The separator is removed and does not appear in either the split or remainder.
"""
# -- number of text separators present in joined text of elements. This includes only
# -- separators *between* text segments, not one at the end. Note there are zero separators
# -- for both 0 and 1 text-segments.
n = len(self._text_segments)
separator_count = n - 1 if n else 0
return self._text_len + (separator_count * self._separator_len)
maxlen, overlap = self._opts.hard_max, self._opts.overlap
# -- A split not longer than overlap will not progress (infinite loop). On the right side,
# -- need to extend search range to include a separator located exactly at maxlen.
match = pattern.search(s, pos=overlap + 1, endpos=maxlen + sep_len)
if match is None:
return "", s
# -- characterize match location
match_start, match_end = match.span()
# -- matched separator is replaced by single-space in overlap string --
separator = " "
# -- in multi-space situation, fragment may have trailing whitespace because match is from
# -- right to left
fragment = s[:match_start].rstrip()
# -- remainder can have leading space when match is on "\n" followed by spaces --
raw_remainder = s[match_end:].lstrip()
if overlap <= len(separator):
return fragment, raw_remainder
# -- compute overlap --
tail_len = overlap - len(separator)
tail = fragment[-tail_len:].lstrip()
overlapped_remainder = tail + separator + raw_remainder
return fragment, overlapped_remainder
# ================================================================================================
# PRE-CHUNK COMBINER
# ================================================================================================
class PreChunkCombiner:
@ -966,7 +967,7 @@ class TextPreChunkAccumulator:
def __init__(self, opts: ChunkingOptions) -> None:
self._opts = opts
self._pre_chunk: Optional[TextPreChunk] = None
self._pre_chunk: TextPreChunk | None = None
def add_pre_chunk(self, pre_chunk: TextPreChunk) -> None:
"""Add a pre-chunk to the accumulator for possible combination with next pre-chunk."""

View File

View File

@ -0,0 +1,39 @@
"""Provides operations related to the HTML table stored in `.metadata.text_as_html`.
Used during partitioning as well as chunking.
"""
from __future__ import annotations
import html
from typing import Iterator, Sequence
def htmlify_matrix_of_cell_texts(matrix: Sequence[Sequence[str]]) -> str:
"""Form an HTML table from "rows" and "columns" of `matrix`.
Character overhead is minimized:
- No whitespace padding is added for human readability
- No newlines ("\n") are added
- No `<thead>`, `<tbody>`, or `<tfoot>` elements are used; we can't tell where those might be
semantically appropriate anyway so at best they would consume unnecessary space and at worst
would be misleading.
"""
def iter_trs(rows_of_cell_strs: Sequence[Sequence[str]]) -> Iterator[str]:
for row_cell_strs in rows_of_cell_strs:
# -- suppress emission of rows with no cells --
if not row_cell_strs:
continue
yield f"<tr>{''.join(iter_tds(row_cell_strs))}</tr>"
def iter_tds(row_cell_strs: Sequence[str]) -> Iterator[str]:
for s in row_cell_strs:
# -- take care of things like '<' and '>' in the text --
s = html.escape(s)
# -- substitute <br/> elements for line-feeds in the text --
s = "<br/>".join(s.split("\n"))
# -- strip leading and trailing whitespace, wrap it up and go --
yield f"<td>{s.strip()}</td>"
return f"<table>{''.join(iter_trs(matrix))}</table>" if matrix else ""

View File

@ -83,6 +83,7 @@ from lxml import etree
from typing_extensions import TypeAlias
from unstructured.cleaners.core import clean_bullets
from unstructured.common.html_table import htmlify_matrix_of_cell_texts
from unstructured.documents.elements import (
Address,
Element,
@ -101,7 +102,7 @@ from unstructured.partition.text_type import (
is_possible_title,
is_us_city_state_zip,
)
from unstructured.utils import htmlify_matrix_of_cell_texts, lazyproperty
from unstructured.utils import lazyproperty
# ------------------------------------------------------------------------------------------------
# DOMAIN MODEL

View File

@ -4,12 +4,12 @@ from __future__ import annotations
import io
from tempfile import SpooledTemporaryFile
from typing import IO, Any, Iterator, Optional, cast
from typing import IO, Any, Iterator, Optional
import networkx as nx
import numpy as np
import pandas as pd
from lxml.html.soupparser import fromstring as soupparser_fromstring # pyright: ignore
from lxml.html.soupparser import fromstring as soupparser_fromstring
from typing_extensions import Self, TypeAlias
from unstructured.chunking import add_chunking_strategy
@ -110,19 +110,12 @@ def partition_xlsx(
):
if not opts.find_subtable:
html_text = (
sheet.to_html( # pyright: ignore[reportUnknownMemberType]
index=False, header=opts.include_header, na_rep=""
)
sheet.to_html(index=False, header=opts.include_header, na_rep="")
if opts.infer_table_structure
else None
)
# XXX: `html_text` can be `None`. What happens on this call in that case?
text = cast(
str,
soupparser_fromstring( # pyright: ignore[reportUnknownMemberType]
html_text
).text_content(),
)
text = soupparser_fromstring(html_text).text_content()
if opts.include_metadata:
metadata = ElementMetadata(
@ -151,15 +144,10 @@ def partition_xlsx(
# -- emit core-table (if it exists) as a `Table` element --
core_table = subtable_parser.core_table
if core_table is not None:
html_text = core_table.to_html( # pyright: ignore[reportUnknownMemberType]
html_text = core_table.to_html(
index=False, header=opts.include_header, na_rep=""
)
text = cast(
str,
soupparser_fromstring( # pyright: ignore[reportUnknownMemberType]
html_text
).text_content(),
)
text = soupparser_fromstring(html_text).text_content()
element = Table(text=text)
element.metadata = _get_metadata(sheet_name, page_number, opts)
element.metadata.text_as_html = (
@ -285,17 +273,13 @@ class _XlsxPartitionerOptions:
def sheets(self) -> dict[str, pd.DataFrame]:
"""The spreadsheet worksheets, each as a data-frame mapped by sheet-name."""
if file_path := self._file_path:
return pd.read_excel( # pyright: ignore[reportUnknownMemberType]
file_path, sheet_name=None, header=self.header_row_idx
)
return pd.read_excel(file_path, sheet_name=None, header=self.header_row_idx)
if f := self._file:
if isinstance(f, SpooledTemporaryFile):
f.seek(0)
f = io.BytesIO(f.read())
return pd.read_excel( # pyright: ignore[reportUnknownMemberType]
f, sheet_name=None, header=self.header_row_idx
)
return pd.read_excel(f, sheet_name=None, header=self.header_row_idx)
raise ValueError("Either 'filename' or 'file' argument must be specified.")
@ -383,7 +367,7 @@ class _ConnectedComponents:
max_row, max_col = self._worksheet_df.shape
node_array = np.indices((max_row, max_col)).T
empty_cells = self._worksheet_df.isna().T
nodes_to_remove = [tuple(pair) for pair in node_array[empty_cells]]
nodes_to_remove = [tuple(pair) for pair in node_array[empty_cells]] # pyright: ignore
graph: nx.Graph = nx.grid_2d_graph(max_row, max_col) # pyright: ignore
graph.remove_nodes_from(nodes_to_remove) # pyright: ignore
@ -499,7 +483,7 @@ class _SubtableParser:
"""Index of each single-cell row in subtable, in top-down order."""
def iter_single_cell_row_idxs() -> Iterator[int]:
for idx, (_, row) in enumerate(self._subtable.iterrows()): # pyright: ignore
for idx, (_, row) in enumerate(self._subtable.iterrows()):
if row.count() != 1:
continue
yield idx

View File

@ -2,7 +2,6 @@ from __future__ import annotations
import asyncio
import functools
import html
import importlib
import inspect
import json
@ -23,7 +22,6 @@ from typing import (
Iterator,
List,
Optional,
Sequence,
Tuple,
TypeVar,
cast,
@ -62,36 +60,6 @@ def get_call_args_applying_defaults(
return call_args
def htmlify_matrix_of_cell_texts(matrix: Sequence[Sequence[str]]) -> str:
"""Form an HTML table from "rows" and "columns" of `matrix`.
Character overhead is minimized:
- No whitespace padding is added for human readability
- No newlines ("\n") are added
- No `<thead>`, `<tbody>`, or `<tfoot>` elements are used; we can't tell where those might be
semantically appropriate anyway so at best they would consume unnecessary space and at worst
would be misleading.
"""
def iter_trs(rows_of_cell_strs: Sequence[Sequence[str]]) -> Iterator[str]:
for row_cell_strs in rows_of_cell_strs:
# -- suppress emission of rows with no cells --
if not row_cell_strs:
continue
yield f"<tr>{''.join(iter_tds(row_cell_strs))}</tr>"
def iter_tds(row_cell_strs: Sequence[str]) -> Iterator[str]:
for s in row_cell_strs:
# -- take care of things like '<' and '>' in the text --
s = html.escape(s)
# -- substitute <br/> elements for line-feeds in the text --
s = "<br/>".join(s.split("\n"))
# -- strip leading and trailing whitespace, wrap it up and go --
yield f"<td>{s.strip()}</td>"
return f"<table>{''.join(iter_trs(matrix))}</table>" if matrix else ""
def is_temp_file_path(file_path: str) -> bool:
"""True when file_path is in the Python-defined tempdir.