diff --git a/CHANGELOG.md b/CHANGELOG.md index cc31c8171..e714ac53b 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,4 +1,4 @@ -## 0.10.29-dev6 +## 0.10.29-dev7 ### Enhancements diff --git a/test_unstructured/partition/pptx/test_pptx.py b/test_unstructured/partition/pptx/test_pptx.py index 510be2e2e..5eb72b702 100644 --- a/test_unstructured/partition/pptx/test_pptx.py +++ b/test_unstructured/partition/pptx/test_pptx.py @@ -395,7 +395,7 @@ def test_partition_pptx_respects_detect_language_per_element(): def test_partition_pptx_raises_TypeError_for_invalid_languages(): with pytest.raises(TypeError): filename = os.path.join(EXAMPLE_DOCS_DIRECTORY, "fake-power-point.pptx") - partition_pptx(filename=filename, languages="eng") + partition_pptx(filename=filename, languages="eng") # type: ignore # == DescribePptxPartitionerDownstreamBehaviors ================================================== diff --git a/test_unstructured/partition/test_text.py b/test_unstructured/partition/test_text.py index 810498a69..beb006d8f 100644 --- a/test_unstructured/partition/test_text.py +++ b/test_unstructured/partition/test_text.py @@ -1,23 +1,28 @@ +# pyright: reportPrivateUsage=false + from __future__ import annotations import json import os import pathlib +from typing import Optional, Sequence, Type, cast import pytest +from pytest_mock import MockerFixture from test_unstructured.unit_utils import assert_round_trips_through_JSON, example_doc_path from unstructured.chunking.title import chunk_by_title from unstructured.cleaners.core import group_broken_paragraphs -from unstructured.documents.elements import Address, ListItem, NarrativeText, Title +from unstructured.documents.elements import Address, ListItem, NarrativeText, Text, Title from unstructured.partition.text import ( - combine_paragraphs_less_than_min, + _combine_paragraphs_less_than_min, + _split_content_to_fit_max, partition_text, - split_content_to_fit_max, ) from unstructured.partition.utils.constants import UNSTRUCTURED_INCLUDE_DEBUG_METADATA DIRECTORY = pathlib.Path(__file__).parent.resolve() +EXAMPLE_DOCS_DIRECTORY = os.path.join(DIRECTORY, "..", "..", "example-docs") EXPECTED_OUTPUT = [ NarrativeText(text="This is a test document to use for unit tests."), @@ -62,8 +67,8 @@ End. ("fake-text-utf-16-be.txt", "utf-16-be"), ], ) -def test_partition_text_from_filename(filename, encoding): - filename_path = os.path.join(DIRECTORY, "..", "..", "example-docs", filename) +def test_partition_text_from_filename(filename: str, encoding: Optional[str]): + filename_path = os.path.join(EXAMPLE_DOCS_DIRECTORY, filename) elements = partition_text(filename=filename_path, encoding=encoding) assert len(elements) > 0 assert elements == EXPECTED_OUTPUT @@ -74,7 +79,7 @@ def test_partition_text_from_filename(filename, encoding): def test_partition_text_from_filename_with_metadata_filename(): - filename_path = os.path.join(DIRECTORY, "..", "..", "example-docs", "fake-text.txt") + filename_path = os.path.join(EXAMPLE_DOCS_DIRECTORY, "fake-text.txt") elements = partition_text( filename=filename_path, encoding="utf-8", @@ -89,8 +94,8 @@ def test_partition_text_from_filename_with_metadata_filename(): "filename", ["fake-text-utf-16.txt", "fake-text-utf-16-le.txt", "fake-text-utf-32.txt"], ) -def test_partition_text_from_filename_default_encoding(filename): - filename_path = os.path.join(DIRECTORY, "..", "..", "example-docs", filename) +def test_partition_text_from_filename_default_encoding(filename: str): + filename_path = os.path.join(EXAMPLE_DOCS_DIRECTORY, filename) elements = partition_text(filename=filename_path) assert len(elements) > 0 assert elements == EXPECTED_OUTPUT @@ -105,15 +110,19 @@ def test_partition_text_from_filename_default_encoding(filename): ("fake-text-utf-16-be.txt", "utf-16", UnicodeError), ], ) -def test_partition_text_from_filename_raises_econding_error(filename, encoding, error): +def test_partition_text_from_filename_raises_econding_error( + filename: str, + encoding: Optional[str], + error: Type[BaseException], +): with pytest.raises(error): - filename = os.path.join(DIRECTORY, "..", "..", "example-docs", filename) + filename = os.path.join(EXAMPLE_DOCS_DIRECTORY, filename) partition_text(filename=filename, encoding=encoding) def test_partition_text_from_file(): - filename = os.path.join(DIRECTORY, "..", "..", "example-docs", "fake-text.txt") - with open(filename) as f: + filename = os.path.join(EXAMPLE_DOCS_DIRECTORY, "fake-text.txt") + with open(filename, "rb") as f: elements = partition_text(file=f) assert len(elements) > 0 assert elements == EXPECTED_OUTPUT @@ -122,8 +131,8 @@ def test_partition_text_from_file(): def test_partition_text_from_file_with_metadata_filename(): - filename = os.path.join(DIRECTORY, "..", "..", "example-docs", "fake-text.txt") - with open(filename) as f: + filename = os.path.join(EXAMPLE_DOCS_DIRECTORY, "fake-text.txt") + with open(filename, "rb") as f: elements = partition_text(file=f, metadata_filename="test") assert len(elements) > 0 assert elements == EXPECTED_OUTPUT @@ -135,9 +144,9 @@ def test_partition_text_from_file_with_metadata_filename(): "filename", ["fake-text-utf-16.txt", "fake-text-utf-16-le.txt", "fake-text-utf-32.txt"], ) -def test_partition_text_from_file_default_encoding(filename): - filename_path = os.path.join(DIRECTORY, "..", "..", "example-docs", filename) - with open(filename_path) as f: +def test_partition_text_from_file_default_encoding(filename: str): + filename_path = os.path.join(EXAMPLE_DOCS_DIRECTORY, filename) + with open(filename_path, "rb") as f: elements = partition_text(file=f) assert len(elements) > 0 assert elements == EXPECTED_OUTPUT @@ -146,7 +155,7 @@ def test_partition_text_from_file_default_encoding(filename): def test_partition_text_from_bytes_file(): - filename = os.path.join(DIRECTORY, "..", "..", "example-docs", "fake-text.txt") + filename = os.path.join(EXAMPLE_DOCS_DIRECTORY, "fake-text.txt") with open(filename, "rb") as f: elements = partition_text(file=f) assert len(elements) > 0 @@ -159,8 +168,8 @@ def test_partition_text_from_bytes_file(): "filename", ["fake-text-utf-16.txt", "fake-text-utf-16-le.txt", "fake-text-utf-32.txt"], ) -def test_partition_text_from_bytes_file_default_encoding(filename): - filename_path = os.path.join(DIRECTORY, "..", "..", "example-docs", filename) +def test_partition_text_from_bytes_file_default_encoding(filename: str): + filename_path = os.path.join(EXAMPLE_DOCS_DIRECTORY, filename) with open(filename_path, "rb") as f: elements = partition_text(file=f) assert len(elements) > 0 @@ -176,7 +185,7 @@ def test_text_partition_element_metadata_user_provided_languages(): def test_partition_text_from_text(): - filename = os.path.join(DIRECTORY, "..", "..", "example-docs", "fake-text.txt") + filename = os.path.join(EXAMPLE_DOCS_DIRECTORY, "fake-text.txt") with open(filename) as f: text = f.read() elements = partition_text(text=text) @@ -196,7 +205,7 @@ def test_partition_text_raises_with_none_specified(): def test_partition_text_raises_with_too_many_specified(): - filename = os.path.join(DIRECTORY, "..", "..", "example-docs", "fake-text.txt") + filename = os.path.join(EXAMPLE_DOCS_DIRECTORY, "fake-text.txt") with open(filename) as f: text = f.read() @@ -245,16 +254,18 @@ def test_partition_text_extract_regex_metadata(): assert element.metadata.filename is None -def test_partition_text_splits_long_text(filename="example-docs/norwich-city.txt"): - elements = partition_text(filename=filename) +def test_partition_text_splits_long_text(): + filename = os.path.join(EXAMPLE_DOCS_DIRECTORY, "norwich-city.txt") + elements = cast(Sequence[Text], partition_text(filename=filename)) assert len(elements) > 0 assert elements[0].text.startswith("Iwan Roberts") assert elements[-1].text.endswith("External links") -def test_partition_text_splits_long_text_max_partition(filename="example-docs/norwich-city.txt"): - elements = partition_text(filename=filename) - elements_max_part = partition_text(filename=filename, max_partition=500) +def test_partition_text_splits_long_text_max_partition(): + filename = os.path.join(EXAMPLE_DOCS_DIRECTORY, "norwich-city.txt") + elements = cast(Sequence[Text], partition_text(filename=filename)) + elements_max_part = cast(Sequence[Text], partition_text(filename=filename, max_partition=500)) # NOTE(klaijan) - I edited the operation here from < to <= # Please revert back if this does not make sense assert len(elements) <= len(elements_max_part) @@ -265,9 +276,13 @@ def test_partition_text_splits_long_text_max_partition(filename="example-docs/no assert " ".join([el.text for el in elements]) == " ".join([el.text for el in elements_max_part]) -def test_partition_text_splits_max_min_partition(filename="example-docs/norwich-city.txt"): - elements = partition_text(filename=filename) - elements_max_part = partition_text(filename=filename, min_partition=1000, max_partition=1500) +def test_partition_text_splits_max_min_partition(): + filename = os.path.join(EXAMPLE_DOCS_DIRECTORY, "norwich-city.txt") + elements = cast(Sequence[Text], partition_text(filename=filename)) + elements_max_part = cast( + Sequence[Text], + partition_text(filename=filename, min_partition=1000, max_partition=1500), + ) for i, element in enumerate(elements_max_part): # NOTE(robinson) - the last element does not have a next element to merge with, # so it can be short @@ -298,10 +313,13 @@ def test_partition_text_splits_max_min_partition(filename="example-docs/norwich- ) -def test_partition_text_min_max(filename="example-docs/norwich-city.txt"): - segments = partition_text( - text=SHORT_PARAGRAPHS, - min_partition=6, +def test_partition_text_min_max(): + segments = cast( + Sequence[Text], + partition_text( + text=SHORT_PARAGRAPHS, + min_partition=6, + ), ) for i, segment in enumerate(segments): # NOTE(robinson) - the last element does not have a next element to merge with, @@ -309,10 +327,13 @@ def test_partition_text_min_max(filename="example-docs/norwich-city.txt"): if i < len(segments) - 1: assert len(segment.text) >= 6 - segments = partition_text( - text=SHORT_PARAGRAPHS, - max_partition=20, - min_partition=7, + segments = cast( + Sequence[Text], + partition_text( + text=SHORT_PARAGRAPHS, + max_partition=20, + min_partition=7, + ), ) for i, segment in enumerate(segments): # NOTE(robinson) - the last element does not have a next element to merge with, @@ -323,7 +344,7 @@ def test_partition_text_min_max(filename="example-docs/norwich-city.txt"): def test_split_content_to_fit_max(): - segments = split_content_to_fit_max( + segments = _split_content_to_fit_max( content=MIN_MAX_TEXT, max_partition=75, ) @@ -337,7 +358,7 @@ def test_split_content_to_fit_max(): def test_combine_paragraphs_less_than_min(): - segments = combine_paragraphs_less_than_min( + segments = _combine_paragraphs_less_than_min( SHORT_PARAGRAPHS.split("\n\n"), max_partition=1500, min_partition=7, @@ -347,7 +368,7 @@ def test_combine_paragraphs_less_than_min(): def test_partition_text_doesnt_get_page_breaks(): text = "--------------------" - elements = partition_text(text=text) + elements = cast(Sequence[Text], partition_text(text=text)) assert len(elements) == 1 assert elements[0].text == text assert not isinstance(elements[0], ListItem) @@ -361,8 +382,8 @@ def test_partition_text_doesnt_get_page_breaks(): ("fake-text-utf-16-be.txt", "utf-16-be"), ], ) -def test_partition_text_from_filename_exclude_metadata(filename, encoding): - filename = os.path.join(DIRECTORY, "..", "..", "example-docs", filename) +def test_partition_text_from_filename_exclude_metadata(filename: str, encoding: Optional[str]): + filename = os.path.join(EXAMPLE_DOCS_DIRECTORY, filename) elements = partition_text( filename=filename, encoding=encoding, @@ -373,17 +394,15 @@ def test_partition_text_from_filename_exclude_metadata(filename, encoding): def test_partition_text_from_file_exclude_metadata(): - filename = os.path.join(DIRECTORY, "..", "..", "example-docs", "fake-text.txt") - with open(filename) as f: + filename = os.path.join(EXAMPLE_DOCS_DIRECTORY, "fake-text.txt") + with open(filename, "rb") as f: elements = partition_text(file=f, include_metadata=False) for i in range(len(elements)): assert elements[i].metadata.to_dict() == {} -def test_partition_text_metadata_date( - mocker, - filename="example-docs/fake-text.txt", -): +def test_partition_text_metadata_date(mocker: MockerFixture): + filename = os.path.join(EXAMPLE_DOCS_DIRECTORY, "fake-text.txt") mocked_last_modification_date = "2029-07-05T09:24:28" mocker.patch( @@ -398,10 +417,8 @@ def test_partition_text_metadata_date( assert elements[0].metadata.last_modified == mocked_last_modification_date -def test_partition_text_with_custom_metadata_date( - mocker, - filename="example-docs/fake-text.txt", -): +def test_partition_text_with_custom_metadata_date(mocker: MockerFixture): + filename = os.path.join(EXAMPLE_DOCS_DIRECTORY, "fake-text.txt") mocked_last_modification_date = "2029-07-05T09:24:28" expected_last_modification_date = "2020-07-05T09:24:28" @@ -418,10 +435,8 @@ def test_partition_text_with_custom_metadata_date( assert elements[0].metadata.last_modified == expected_last_modification_date -def test_partition_text_from_file_metadata_date( - mocker, - filename="example-docs/fake-text.txt", -): +def test_partition_text_from_file_metadata_date(mocker: MockerFixture): + filename = os.path.join(EXAMPLE_DOCS_DIRECTORY, "fake-text.txt") mocked_last_modification_date = "2029-07-05T09:24:28" mocker.patch( @@ -437,10 +452,8 @@ def test_partition_text_from_file_metadata_date( assert elements[0].metadata.last_modified == mocked_last_modification_date -def test_partition_text_from_file_with_custom_metadata_date( - mocker, - filename="example-docs/fake-text.txt", -): +def test_partition_text_from_file_with_custom_metadata_date(mocker: MockerFixture): + filename = os.path.join(EXAMPLE_DOCS_DIRECTORY, "fake-text.txt") mocked_last_modification_date = "2029-07-05T09:24:28" expected_last_modification_date = "2020-07-05T09:24:28" @@ -455,9 +468,8 @@ def test_partition_text_from_file_with_custom_metadata_date( assert elements[0].metadata.last_modified == expected_last_modification_date -def test_partition_text_from_text_metadata_date( - filename="example-docs/fake-text.txt", -): +def test_partition_text_from_text_metadata_date(): + filename = os.path.join(EXAMPLE_DOCS_DIRECTORY, "fake-text.txt") with open(filename) as f: text = f.read() @@ -467,9 +479,8 @@ def test_partition_text_from_text_metadata_date( assert elements[0].metadata.last_modified is None -def test_partition_text_from_text_with_custom_metadata_date( - filename="example-docs/fake-text.txt", -): +def test_partition_text_from_text_with_custom_metadata_date(): + filename = os.path.join(EXAMPLE_DOCS_DIRECTORY, "fake-text.txt") expected_last_modification_date = "2020-07-05T09:24:28" with open(filename) as f: @@ -487,8 +498,10 @@ def test_partition_text_with_unique_ids(): json.dumps(elements[0].to_dict()) elements = partition_text(text="hello there!", unique_element_ids=True) - assert len(elements[0].id) == 36 - assert elements[0].id.count("-") == 4 + id = elements[0].id + assert isinstance(id, str) # included for type-narrowing + assert len(id) == 36 + assert id.count("-") == 4 # Test that the element is JSON serializable. This should run without an error json.dumps(elements[0].to_dict()) @@ -506,7 +519,8 @@ def test_partition_text_with_json(file_name: str, encoding: str | None): assert_round_trips_through_JSON(elements) -def test_add_chunking_strategy_on_partition_text(filename="example-docs/norwich-city.txt"): +def test_add_chunking_strategy_on_partition_text(): + filename = os.path.join(EXAMPLE_DOCS_DIRECTORY, "norwich-city.txt") elements = partition_text(filename=filename) chunk_elements = partition_text(filename, chunking_strategy="by_title") chunks = chunk_by_title(elements) @@ -515,32 +529,32 @@ def test_add_chunking_strategy_on_partition_text(filename="example-docs/norwich- def test_partition_text_element_metadata_has_languages(): - filename = "example-docs/norwich-city.txt" + filename = os.path.join(EXAMPLE_DOCS_DIRECTORY, "norwich-city.txt") elements = partition_text(filename=filename) assert elements[0].metadata.languages == ["eng"] def test_partition_text_respects_detect_language_per_element(): - filename = "example-docs/language-docs/eng_spa_mult.txt" + filename = os.path.join(EXAMPLE_DOCS_DIRECTORY, "language-docs", "eng_spa_mult.txt") elements = partition_text(filename=filename, detect_language_per_element=True) langs = [element.metadata.languages for element in elements] assert langs == [["eng"], ["spa", "eng"], ["eng"], ["eng"], ["spa"]] def test_partition_text_respects_languages_arg(): - filename = "example-docs/norwich-city.txt" + filename = os.path.join(EXAMPLE_DOCS_DIRECTORY, "norwich-city.txt") elements = partition_text(filename=filename, languages=["deu"]) assert elements[0].metadata.languages == ["deu"] def test_partition_text_element_metadata_raises_TypeError(): with pytest.raises(TypeError): - filename = "example-docs/norwich-city.txt" - partition_text(filename=filename, languages="eng") + filename = os.path.join(EXAMPLE_DOCS_DIRECTORY, "norwich-city.txt") + partition_text(filename=filename, languages="eng") # type: ignore def test_partition_text_detects_more_than_3_languages(): - filename = "example-docs/language-docs/UDHR_first_article_all.txt" + filename = os.path.join(EXAMPLE_DOCS_DIRECTORY, "language-docs", "UDHR_first_article_all.txt") elements = partition_text(filename=filename, detect_language_per_element=True) langs = list( {element.metadata.languages[0] for element in elements if element.metadata.languages}, diff --git a/unstructured/__version__.py b/unstructured/__version__.py index 2a9a215cd..82ce929ac 100644 --- a/unstructured/__version__.py +++ b/unstructured/__version__.py @@ -1 +1 @@ -__version__ = "0.10.29-dev6" # pragma: no cover +__version__ = "0.10.29-dev7" # pragma: no cover diff --git a/unstructured/cleaners/core.py b/unstructured/cleaners/core.py index e5df8df0c..02999de10 100644 --- a/unstructured/cleaners/core.py +++ b/unstructured/cleaners/core.py @@ -1,3 +1,5 @@ +from __future__ import annotations + import quopri import re import sys @@ -132,8 +134,8 @@ def group_bullet_paragraph(paragraph: str) -> list: def group_broken_paragraphs( text: str, - line_split: re.Pattern = PARAGRAPH_PATTERN_RE, - paragraph_split: re.Pattern = DOUBLE_PARAGRAPH_PATTERN_RE, + line_split: re.Pattern[str] = PARAGRAPH_PATTERN_RE, + paragraph_split: re.Pattern[str] = DOUBLE_PARAGRAPH_PATTERN_RE, ) -> str: """Groups paragraphs that have line breaks for visual/formatting purposes. For example: @@ -174,7 +176,7 @@ def group_broken_paragraphs( def new_line_grouper( text: str, - paragraph_split: re.Pattern = LINE_BREAK_RE, + paragraph_split: re.Pattern[str] = LINE_BREAK_RE, ) -> str: """ Concatenates text document that has one-line paragraph break pattern @@ -221,7 +223,7 @@ def blank_line_grouper( def auto_paragraph_grouper( text: str, - line_split: re.Pattern = LINE_BREAK_RE, + line_split: re.Pattern[str] = LINE_BREAK_RE, max_line_count: int = 2000, threshold: float = 0.1, ) -> str: diff --git a/unstructured/partition/text.py b/unstructured/partition/text.py index 8b840cf63..5d444e162 100644 --- a/unstructured/partition/text.py +++ b/unstructured/partition/text.py @@ -1,7 +1,7 @@ import copy import re import textwrap -from typing import IO, Callable, List, Optional, Tuple +from typing import IO, Any, Callable, List, Optional, Tuple from unstructured.chunking.title import add_chunking_strategy from unstructured.cleaners.core import ( @@ -40,126 +40,6 @@ from unstructured.partition.text_type import ( ) -def split_by_paragraph( - file_text: str, - min_partition: Optional[int] = 0, - max_partition: Optional[int] = 1500, -) -> List[str]: - paragraphs = re.split(PARAGRAPH_PATTERN, file_text.strip()) - - split_paragraphs = [] - for paragraph in paragraphs: - split_paragraphs.extend( - split_content_to_fit_max( - content=paragraph, - max_partition=max_partition, - ), - ) - - combined_paragraphs = combine_paragraphs_less_than_min( - split_paragraphs=split_paragraphs, - max_partition=max_partition, - min_partition=min_partition, - ) - - return combined_paragraphs - - -def _split_in_half_at_breakpoint( - content: str, - breakpoint: str = " ", -) -> List[str]: - """Splits a segment of content at the breakpoint closest to the middle""" - mid = len(content) // 2 - for i in range(len(content) // 2): - if content[mid + i] == breakpoint: - mid += i - break - elif content[mid - i] == breakpoint: - mid += -i - break - - return [content[:mid].rstrip(), content[mid:].lstrip()] - - -def _split_content_size_n(content: str, n: int) -> List[str]: - """Splits a section of content into chunks that are at most - size n without breaking apart words.""" - segments = [] - if len(content) < n * 2: - segments = list(_split_in_half_at_breakpoint(content)) - else: - segments = textwrap.wrap(content, width=n) - return segments - - -def split_content_to_fit_max( - content: str, - max_partition: Optional[int] = 1500, -) -> List[str]: - """Splits a paragraph or section of content so that all of the elements fit into the - max partition window.""" - sentences = sent_tokenize(content) - chunks = [] - tmp_chunk = "" - for sentence in sentences: - if max_partition is not None and len(sentence) > max_partition: - if tmp_chunk: - chunks.append(tmp_chunk) - tmp_chunk = "" - segments = _split_content_size_n(sentence, n=max_partition) - chunks.extend(segments[:-1]) - tmp_chunk = segments[-1] - else: - if max_partition is not None and len(tmp_chunk + " " + sentence) > max_partition: - chunks.append(tmp_chunk) - tmp_chunk = sentence - else: - if not tmp_chunk: - tmp_chunk = sentence - else: - tmp_chunk += " " + sentence - tmp_chunk = tmp_chunk.strip() - if tmp_chunk: - chunks.append(tmp_chunk) - - return chunks - - -def combine_paragraphs_less_than_min( - split_paragraphs: List[str], - max_partition: Optional[int] = 1500, - min_partition: Optional[int] = 0, -) -> List[str]: - """Combine paragraphs less than `min_partition` while not exceeding `max_partition`.""" - min_partition = min_partition or 0 - max_possible_partition = len(" ".join(split_paragraphs)) - max_partition = max_partition or max_possible_partition - - combined_paras = [] - combined_idxs = [] - for i, para in enumerate(split_paragraphs): - if i in combined_idxs: - continue - - if len(para) >= min_partition: - combined_paras.append(para) - else: - combined_para = para - for j, next_para in enumerate(split_paragraphs[i + 1 :]): # noqa - if len(combined_para) + len(next_para) + 1 <= max_partition: - combined_idxs.append(i + j + 1) - combined_para += " " + next_para - else: - break - combined_paras.append(combined_para) - - return combined_paras - - -@process_metadata() -@add_metadata_with_filetype(FileType.TXT) -@add_chunking_strategy() def partition_text( filename: Optional[str] = None, file: Optional[IO[bytes]] = None, @@ -175,7 +55,7 @@ def partition_text( chunking_strategy: Optional[str] = None, detect_language_per_element: bool = False, detection_origin: Optional[str] = "text", - **kwargs, + **kwargs: Any, ) -> List[Element]: """Partitions an .txt documents into its constituent paragraph elements. If paragraphs are below "min_partition" or above "max_partition" boundaries, @@ -185,7 +65,7 @@ def partition_text( filename A string defining the target filename path. file - A file-like object using "r" mode --> open(filename, "r"). + A file-like object using "rb" mode --> open(filename, "rb"). text The string representation of the .txt document. encoding @@ -210,6 +90,46 @@ def partition_text( metadata_last_modified The day of the last modification """ + return _partition_text( + filename=filename, + file=file, + text=text, + encoding=encoding, + paragraph_grouper=paragraph_grouper, + metadata_filename=metadata_filename, + include_metadata=include_metadata, + languages=languages, + max_partition=max_partition, + min_partition=min_partition, + metadata_last_modified=metadata_last_modified, + chunking_strategy=chunking_strategy, + detect_language_per_element=detect_language_per_element, + detection_origin=detection_origin, + **kwargs, + ) + + +@process_metadata() +@add_metadata_with_filetype(FileType.TXT) +@add_chunking_strategy() +def _partition_text( + filename: Optional[str] = None, + file: Optional[IO[bytes]] = None, + text: Optional[str] = None, + encoding: Optional[str] = None, + paragraph_grouper: Optional[Callable[[str], str]] = None, + metadata_filename: Optional[str] = None, + include_metadata: bool = True, + languages: Optional[List[str]] = ["auto"], + max_partition: Optional[int] = 1500, + min_partition: Optional[int] = 0, + metadata_last_modified: Optional[str] = None, + chunking_strategy: Optional[str] = None, + detect_language_per_element: bool = False, + detection_origin: Optional[str] = "text", + **kwargs: Any, +) -> List[Element]: + """internal API for `partition_text`""" if text is not None and text.strip() == "" and not file and not filename: return [] @@ -222,6 +142,7 @@ def partition_text( # Verify that only one of the arguments was provided exactly_one(filename=filename, file=file, text=text) + file_text = "" last_modification_date = None if filename is not None: @@ -245,7 +166,7 @@ def partition_text( if min_partition is not None and len(file_text) < min_partition: raise ValueError("`min_partition` cannot be larger than the length of file contents.") - file_content = split_by_paragraph( + file_content = _split_by_paragraph( file_text, min_partition=min_partition, max_partition=max_partition, @@ -323,3 +244,133 @@ def element_from_text( coordinates=coordinates, coordinate_system=coordinate_system, ) + + +def _combine_paragraphs_less_than_min( + split_paragraphs: List[str], + max_partition: Optional[int] = 1500, + min_partition: Optional[int] = 0, +) -> List[str]: + """Combine paragraphs less than `min_partition` while not exceeding `max_partition`.""" + min_partition = min_partition or 0 + max_possible_partition = len(" ".join(split_paragraphs)) + max_partition = max_partition or max_possible_partition + + combined_paras: List[str] = [] + combined_idxs: List[int] = [] + for i, para in enumerate(split_paragraphs): + if i in combined_idxs: + continue + # Paragraphs have already been split to fit `max_partition`, so they can be safely added + # to the final list of chunks if they are also greater than `min_partition` + if len(para) >= min_partition: + combined_paras.append(para) + else: + combined_para = para + for j, next_para in enumerate(split_paragraphs[i + 1 :]): # noqa + # Combine the current paragraph(s), e.g. `combined_para` with the next paragraph(s) + # as long as they don't exceed `max_partition`, and keep track of the indices + # that have been combined. + if len(combined_para) + len(next_para) + 1 <= max_partition: + combined_idxs.append(i + j + 1) + combined_para += " " + next_para + else: + break + combined_paras.append(combined_para) + + return combined_paras + + +def _split_by_paragraph( + file_text: str, + min_partition: Optional[int] = 0, + max_partition: Optional[int] = 1500, +) -> List[str]: + """Split text into paragraphs that fit within the `min_` and `max_partition` window.""" + paragraphs = re.split(PARAGRAPH_PATTERN, file_text.strip()) + + split_paragraphs: List[str] = [] + for paragraph in paragraphs: + split_paragraphs.extend( + _split_content_to_fit_max( + content=paragraph, + max_partition=max_partition, + ), + ) + + combined_paragraphs = _combine_paragraphs_less_than_min( + split_paragraphs=split_paragraphs, + max_partition=max_partition, + min_partition=min_partition, + ) + + return combined_paragraphs + + +def _split_content_size_n(content: str, n: int) -> List[str]: + """Splits a section of content into chunks that are at most + size n without breaking apart words.""" + segments = [] + if len(content) < n * 2: + segments = list(_split_in_half_at_breakpoint(content)) + else: + segments = textwrap.wrap(content, width=n) + return segments + + +def _split_content_to_fit_max( + content: str, + max_partition: Optional[int] = 1500, +) -> List[str]: + """Splits a paragraph or section of content so that all of the elements fit into the + max partition window.""" + sentences = sent_tokenize(content) + chunks: List[str] = [] + tmp_chunk = "" + # Initialize an empty string to collect sentence segments (`tmp_chunk`). + for sentence in sentences: + # If a single sentence is larger than `max_partition`, the sentence will be split by + # `_split_content_size_n` and the last segment of the original sentence will be used + # as the beginning of the next chunk. + if max_partition is not None and len(sentence) > max_partition: + if tmp_chunk: + chunks.append(tmp_chunk) + tmp_chunk = "" + segments = _split_content_size_n(sentence, n=max_partition) + chunks.extend(segments[:-1]) + tmp_chunk = segments[-1] + else: + # If the current sentence is smaller than `max_partition`, but adding it to the + # current `tmp_chunk` would exceed `max_partition`, add the `tmp_chunk` to the + # final list of `chunks` and begin the next chunk with the current sentence. + if max_partition is not None and len(tmp_chunk + " " + sentence) > max_partition: + chunks.append(tmp_chunk) + tmp_chunk = sentence + else: + # Otherwise, the sentence can be added to `tmp_chunk` + if not tmp_chunk: + tmp_chunk = sentence + else: + tmp_chunk += " " + sentence + tmp_chunk = tmp_chunk.strip() + if tmp_chunk: + chunks.append(tmp_chunk) + + return chunks + + +def _split_in_half_at_breakpoint( + content: str, + breakpoint: str = " ", +) -> List[str]: + """Splits a segment of content at the breakpoint closest to the middle""" + mid = len(content) // 2 + for i in range(len(content) // 2): + if content[mid + i] == breakpoint: + mid += i + break + elif content[mid - i] == breakpoint: + mid += -i + break + + return [content[:mid].rstrip(), content[mid:].lstrip()] diff --git a/unstructured/partition/text_type.py b/unstructured/partition/text_type.py index 43d47d2fb..8f2b4a9db 100644 --- a/unstructured/partition/text_type.py +++ b/unstructured/partition/text_type.py @@ -311,6 +311,6 @@ def is_email_address(text: str) -> bool: return EMAIL_ADDRESS_PATTERN_RE.match(text.strip()) is not None -def is_possible_numbered_list(text) -> bool: +def is_possible_numbered_list(text: str) -> bool: """Checks to see if the text is a potential numbered list.""" return NUMBERED_LIST_RE.match(text.strip()) is not None