refactor text.py (#1872)

### Summary
Closes #1520 
Partial solution to #1521 

- Adds an abstraction layer between the user API and the partitioner
implementation
- Adds comments explaining paragraph chunking
- Makes edits to pass strict type-checking for both text.py and
test_text.py
This commit is contained in:
John 2023-11-01 17:44:55 -05:00 committed by GitHub
parent b92cab7fbd
commit 2f553333bd
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
7 changed files with 275 additions and 208 deletions

View File

@ -1,4 +1,4 @@
## 0.10.29-dev6 ## 0.10.29-dev7
### Enhancements ### Enhancements

View File

@ -395,7 +395,7 @@ def test_partition_pptx_respects_detect_language_per_element():
def test_partition_pptx_raises_TypeError_for_invalid_languages(): def test_partition_pptx_raises_TypeError_for_invalid_languages():
with pytest.raises(TypeError): with pytest.raises(TypeError):
filename = os.path.join(EXAMPLE_DOCS_DIRECTORY, "fake-power-point.pptx") filename = os.path.join(EXAMPLE_DOCS_DIRECTORY, "fake-power-point.pptx")
partition_pptx(filename=filename, languages="eng") partition_pptx(filename=filename, languages="eng") # type: ignore
# == DescribePptxPartitionerDownstreamBehaviors ================================================== # == DescribePptxPartitionerDownstreamBehaviors ==================================================

View File

@ -1,23 +1,28 @@
# pyright: reportPrivateUsage=false
from __future__ import annotations from __future__ import annotations
import json import json
import os import os
import pathlib import pathlib
from typing import Optional, Sequence, Type, cast
import pytest import pytest
from pytest_mock import MockerFixture
from test_unstructured.unit_utils import assert_round_trips_through_JSON, example_doc_path from test_unstructured.unit_utils import assert_round_trips_through_JSON, example_doc_path
from unstructured.chunking.title import chunk_by_title from unstructured.chunking.title import chunk_by_title
from unstructured.cleaners.core import group_broken_paragraphs from unstructured.cleaners.core import group_broken_paragraphs
from unstructured.documents.elements import Address, ListItem, NarrativeText, Title from unstructured.documents.elements import Address, ListItem, NarrativeText, Text, Title
from unstructured.partition.text import ( from unstructured.partition.text import (
combine_paragraphs_less_than_min, _combine_paragraphs_less_than_min,
_split_content_to_fit_max,
partition_text, partition_text,
split_content_to_fit_max,
) )
from unstructured.partition.utils.constants import UNSTRUCTURED_INCLUDE_DEBUG_METADATA from unstructured.partition.utils.constants import UNSTRUCTURED_INCLUDE_DEBUG_METADATA
DIRECTORY = pathlib.Path(__file__).parent.resolve() DIRECTORY = pathlib.Path(__file__).parent.resolve()
EXAMPLE_DOCS_DIRECTORY = os.path.join(DIRECTORY, "..", "..", "example-docs")
EXPECTED_OUTPUT = [ EXPECTED_OUTPUT = [
NarrativeText(text="This is a test document to use for unit tests."), NarrativeText(text="This is a test document to use for unit tests."),
@ -62,8 +67,8 @@ End.
("fake-text-utf-16-be.txt", "utf-16-be"), ("fake-text-utf-16-be.txt", "utf-16-be"),
], ],
) )
def test_partition_text_from_filename(filename, encoding): def test_partition_text_from_filename(filename: str, encoding: Optional[str]):
filename_path = os.path.join(DIRECTORY, "..", "..", "example-docs", filename) filename_path = os.path.join(EXAMPLE_DOCS_DIRECTORY, filename)
elements = partition_text(filename=filename_path, encoding=encoding) elements = partition_text(filename=filename_path, encoding=encoding)
assert len(elements) > 0 assert len(elements) > 0
assert elements == EXPECTED_OUTPUT assert elements == EXPECTED_OUTPUT
@ -74,7 +79,7 @@ def test_partition_text_from_filename(filename, encoding):
def test_partition_text_from_filename_with_metadata_filename(): def test_partition_text_from_filename_with_metadata_filename():
filename_path = os.path.join(DIRECTORY, "..", "..", "example-docs", "fake-text.txt") filename_path = os.path.join(EXAMPLE_DOCS_DIRECTORY, "fake-text.txt")
elements = partition_text( elements = partition_text(
filename=filename_path, filename=filename_path,
encoding="utf-8", encoding="utf-8",
@ -89,8 +94,8 @@ def test_partition_text_from_filename_with_metadata_filename():
"filename", "filename",
["fake-text-utf-16.txt", "fake-text-utf-16-le.txt", "fake-text-utf-32.txt"], ["fake-text-utf-16.txt", "fake-text-utf-16-le.txt", "fake-text-utf-32.txt"],
) )
def test_partition_text_from_filename_default_encoding(filename): def test_partition_text_from_filename_default_encoding(filename: str):
filename_path = os.path.join(DIRECTORY, "..", "..", "example-docs", filename) filename_path = os.path.join(EXAMPLE_DOCS_DIRECTORY, filename)
elements = partition_text(filename=filename_path) elements = partition_text(filename=filename_path)
assert len(elements) > 0 assert len(elements) > 0
assert elements == EXPECTED_OUTPUT assert elements == EXPECTED_OUTPUT
@ -105,15 +110,19 @@ def test_partition_text_from_filename_default_encoding(filename):
("fake-text-utf-16-be.txt", "utf-16", UnicodeError), ("fake-text-utf-16-be.txt", "utf-16", UnicodeError),
], ],
) )
def test_partition_text_from_filename_raises_econding_error(filename, encoding, error): def test_partition_text_from_filename_raises_econding_error(
filename: str,
encoding: Optional[str],
error: Type[BaseException],
):
with pytest.raises(error): with pytest.raises(error):
filename = os.path.join(DIRECTORY, "..", "..", "example-docs", filename) filename = os.path.join(EXAMPLE_DOCS_DIRECTORY, filename)
partition_text(filename=filename, encoding=encoding) partition_text(filename=filename, encoding=encoding)
def test_partition_text_from_file(): def test_partition_text_from_file():
filename = os.path.join(DIRECTORY, "..", "..", "example-docs", "fake-text.txt") filename = os.path.join(EXAMPLE_DOCS_DIRECTORY, "fake-text.txt")
with open(filename) as f: with open(filename, "rb") as f:
elements = partition_text(file=f) elements = partition_text(file=f)
assert len(elements) > 0 assert len(elements) > 0
assert elements == EXPECTED_OUTPUT assert elements == EXPECTED_OUTPUT
@ -122,8 +131,8 @@ def test_partition_text_from_file():
def test_partition_text_from_file_with_metadata_filename(): def test_partition_text_from_file_with_metadata_filename():
filename = os.path.join(DIRECTORY, "..", "..", "example-docs", "fake-text.txt") filename = os.path.join(EXAMPLE_DOCS_DIRECTORY, "fake-text.txt")
with open(filename) as f: with open(filename, "rb") as f:
elements = partition_text(file=f, metadata_filename="test") elements = partition_text(file=f, metadata_filename="test")
assert len(elements) > 0 assert len(elements) > 0
assert elements == EXPECTED_OUTPUT assert elements == EXPECTED_OUTPUT
@ -135,9 +144,9 @@ def test_partition_text_from_file_with_metadata_filename():
"filename", "filename",
["fake-text-utf-16.txt", "fake-text-utf-16-le.txt", "fake-text-utf-32.txt"], ["fake-text-utf-16.txt", "fake-text-utf-16-le.txt", "fake-text-utf-32.txt"],
) )
def test_partition_text_from_file_default_encoding(filename): def test_partition_text_from_file_default_encoding(filename: str):
filename_path = os.path.join(DIRECTORY, "..", "..", "example-docs", filename) filename_path = os.path.join(EXAMPLE_DOCS_DIRECTORY, filename)
with open(filename_path) as f: with open(filename_path, "rb") as f:
elements = partition_text(file=f) elements = partition_text(file=f)
assert len(elements) > 0 assert len(elements) > 0
assert elements == EXPECTED_OUTPUT assert elements == EXPECTED_OUTPUT
@ -146,7 +155,7 @@ def test_partition_text_from_file_default_encoding(filename):
def test_partition_text_from_bytes_file(): def test_partition_text_from_bytes_file():
filename = os.path.join(DIRECTORY, "..", "..", "example-docs", "fake-text.txt") filename = os.path.join(EXAMPLE_DOCS_DIRECTORY, "fake-text.txt")
with open(filename, "rb") as f: with open(filename, "rb") as f:
elements = partition_text(file=f) elements = partition_text(file=f)
assert len(elements) > 0 assert len(elements) > 0
@ -159,8 +168,8 @@ def test_partition_text_from_bytes_file():
"filename", "filename",
["fake-text-utf-16.txt", "fake-text-utf-16-le.txt", "fake-text-utf-32.txt"], ["fake-text-utf-16.txt", "fake-text-utf-16-le.txt", "fake-text-utf-32.txt"],
) )
def test_partition_text_from_bytes_file_default_encoding(filename): def test_partition_text_from_bytes_file_default_encoding(filename: str):
filename_path = os.path.join(DIRECTORY, "..", "..", "example-docs", filename) filename_path = os.path.join(EXAMPLE_DOCS_DIRECTORY, filename)
with open(filename_path, "rb") as f: with open(filename_path, "rb") as f:
elements = partition_text(file=f) elements = partition_text(file=f)
assert len(elements) > 0 assert len(elements) > 0
@ -176,7 +185,7 @@ def test_text_partition_element_metadata_user_provided_languages():
def test_partition_text_from_text(): def test_partition_text_from_text():
filename = os.path.join(DIRECTORY, "..", "..", "example-docs", "fake-text.txt") filename = os.path.join(EXAMPLE_DOCS_DIRECTORY, "fake-text.txt")
with open(filename) as f: with open(filename) as f:
text = f.read() text = f.read()
elements = partition_text(text=text) elements = partition_text(text=text)
@ -196,7 +205,7 @@ def test_partition_text_raises_with_none_specified():
def test_partition_text_raises_with_too_many_specified(): def test_partition_text_raises_with_too_many_specified():
filename = os.path.join(DIRECTORY, "..", "..", "example-docs", "fake-text.txt") filename = os.path.join(EXAMPLE_DOCS_DIRECTORY, "fake-text.txt")
with open(filename) as f: with open(filename) as f:
text = f.read() text = f.read()
@ -245,16 +254,18 @@ def test_partition_text_extract_regex_metadata():
assert element.metadata.filename is None assert element.metadata.filename is None
def test_partition_text_splits_long_text(filename="example-docs/norwich-city.txt"): def test_partition_text_splits_long_text():
elements = partition_text(filename=filename) filename = os.path.join(EXAMPLE_DOCS_DIRECTORY, "norwich-city.txt")
elements = cast(Sequence[Text], partition_text(filename=filename))
assert len(elements) > 0 assert len(elements) > 0
assert elements[0].text.startswith("Iwan Roberts") assert elements[0].text.startswith("Iwan Roberts")
assert elements[-1].text.endswith("External links") assert elements[-1].text.endswith("External links")
def test_partition_text_splits_long_text_max_partition(filename="example-docs/norwich-city.txt"): def test_partition_text_splits_long_text_max_partition():
elements = partition_text(filename=filename) filename = os.path.join(EXAMPLE_DOCS_DIRECTORY, "norwich-city.txt")
elements_max_part = partition_text(filename=filename, max_partition=500) elements = cast(Sequence[Text], partition_text(filename=filename))
elements_max_part = cast(Sequence[Text], partition_text(filename=filename, max_partition=500))
# NOTE(klaijan) - I edited the operation here from < to <= # NOTE(klaijan) - I edited the operation here from < to <=
# Please revert back if this does not make sense # Please revert back if this does not make sense
assert len(elements) <= len(elements_max_part) assert len(elements) <= len(elements_max_part)
@ -265,9 +276,13 @@ def test_partition_text_splits_long_text_max_partition(filename="example-docs/no
assert " ".join([el.text for el in elements]) == " ".join([el.text for el in elements_max_part]) assert " ".join([el.text for el in elements]) == " ".join([el.text for el in elements_max_part])
def test_partition_text_splits_max_min_partition(filename="example-docs/norwich-city.txt"): def test_partition_text_splits_max_min_partition():
elements = partition_text(filename=filename) filename = os.path.join(EXAMPLE_DOCS_DIRECTORY, "norwich-city.txt")
elements_max_part = partition_text(filename=filename, min_partition=1000, max_partition=1500) elements = cast(Sequence[Text], partition_text(filename=filename))
elements_max_part = cast(
Sequence[Text],
partition_text(filename=filename, min_partition=1000, max_partition=1500),
)
for i, element in enumerate(elements_max_part): for i, element in enumerate(elements_max_part):
# NOTE(robinson) - the last element does not have a next element to merge with, # NOTE(robinson) - the last element does not have a next element to merge with,
# so it can be short # so it can be short
@ -298,10 +313,13 @@ def test_partition_text_splits_max_min_partition(filename="example-docs/norwich-
) )
def test_partition_text_min_max(filename="example-docs/norwich-city.txt"): def test_partition_text_min_max():
segments = partition_text( segments = cast(
text=SHORT_PARAGRAPHS, Sequence[Text],
min_partition=6, partition_text(
text=SHORT_PARAGRAPHS,
min_partition=6,
),
) )
for i, segment in enumerate(segments): for i, segment in enumerate(segments):
# NOTE(robinson) - the last element does not have a next element to merge with, # NOTE(robinson) - the last element does not have a next element to merge with,
@ -309,10 +327,13 @@ def test_partition_text_min_max(filename="example-docs/norwich-city.txt"):
if i < len(segments) - 1: if i < len(segments) - 1:
assert len(segment.text) >= 6 assert len(segment.text) >= 6
segments = partition_text( segments = cast(
text=SHORT_PARAGRAPHS, Sequence[Text],
max_partition=20, partition_text(
min_partition=7, text=SHORT_PARAGRAPHS,
max_partition=20,
min_partition=7,
),
) )
for i, segment in enumerate(segments): for i, segment in enumerate(segments):
# NOTE(robinson) - the last element does not have a next element to merge with, # NOTE(robinson) - the last element does not have a next element to merge with,
@ -323,7 +344,7 @@ def test_partition_text_min_max(filename="example-docs/norwich-city.txt"):
def test_split_content_to_fit_max(): def test_split_content_to_fit_max():
segments = split_content_to_fit_max( segments = _split_content_to_fit_max(
content=MIN_MAX_TEXT, content=MIN_MAX_TEXT,
max_partition=75, max_partition=75,
) )
@ -337,7 +358,7 @@ def test_split_content_to_fit_max():
def test_combine_paragraphs_less_than_min(): def test_combine_paragraphs_less_than_min():
segments = combine_paragraphs_less_than_min( segments = _combine_paragraphs_less_than_min(
SHORT_PARAGRAPHS.split("\n\n"), SHORT_PARAGRAPHS.split("\n\n"),
max_partition=1500, max_partition=1500,
min_partition=7, min_partition=7,
@ -347,7 +368,7 @@ def test_combine_paragraphs_less_than_min():
def test_partition_text_doesnt_get_page_breaks(): def test_partition_text_doesnt_get_page_breaks():
text = "--------------------" text = "--------------------"
elements = partition_text(text=text) elements = cast(Sequence[Text], partition_text(text=text))
assert len(elements) == 1 assert len(elements) == 1
assert elements[0].text == text assert elements[0].text == text
assert not isinstance(elements[0], ListItem) assert not isinstance(elements[0], ListItem)
@ -361,8 +382,8 @@ def test_partition_text_doesnt_get_page_breaks():
("fake-text-utf-16-be.txt", "utf-16-be"), ("fake-text-utf-16-be.txt", "utf-16-be"),
], ],
) )
def test_partition_text_from_filename_exclude_metadata(filename, encoding): def test_partition_text_from_filename_exclude_metadata(filename: str, encoding: Optional[str]):
filename = os.path.join(DIRECTORY, "..", "..", "example-docs", filename) filename = os.path.join(EXAMPLE_DOCS_DIRECTORY, filename)
elements = partition_text( elements = partition_text(
filename=filename, filename=filename,
encoding=encoding, encoding=encoding,
@ -373,17 +394,15 @@ def test_partition_text_from_filename_exclude_metadata(filename, encoding):
def test_partition_text_from_file_exclude_metadata(): def test_partition_text_from_file_exclude_metadata():
filename = os.path.join(DIRECTORY, "..", "..", "example-docs", "fake-text.txt") filename = os.path.join(EXAMPLE_DOCS_DIRECTORY, "fake-text.txt")
with open(filename) as f: with open(filename, "rb") as f:
elements = partition_text(file=f, include_metadata=False) elements = partition_text(file=f, include_metadata=False)
for i in range(len(elements)): for i in range(len(elements)):
assert elements[i].metadata.to_dict() == {} assert elements[i].metadata.to_dict() == {}
def test_partition_text_metadata_date( def test_partition_text_metadata_date(mocker: MockerFixture):
mocker, filename = os.path.join(EXAMPLE_DOCS_DIRECTORY, "fake-text.txt")
filename="example-docs/fake-text.txt",
):
mocked_last_modification_date = "2029-07-05T09:24:28" mocked_last_modification_date = "2029-07-05T09:24:28"
mocker.patch( mocker.patch(
@ -398,10 +417,8 @@ def test_partition_text_metadata_date(
assert elements[0].metadata.last_modified == mocked_last_modification_date assert elements[0].metadata.last_modified == mocked_last_modification_date
def test_partition_text_with_custom_metadata_date( def test_partition_text_with_custom_metadata_date(mocker: MockerFixture):
mocker, filename = os.path.join(EXAMPLE_DOCS_DIRECTORY, "fake-text.txt")
filename="example-docs/fake-text.txt",
):
mocked_last_modification_date = "2029-07-05T09:24:28" mocked_last_modification_date = "2029-07-05T09:24:28"
expected_last_modification_date = "2020-07-05T09:24:28" expected_last_modification_date = "2020-07-05T09:24:28"
@ -418,10 +435,8 @@ def test_partition_text_with_custom_metadata_date(
assert elements[0].metadata.last_modified == expected_last_modification_date assert elements[0].metadata.last_modified == expected_last_modification_date
def test_partition_text_from_file_metadata_date( def test_partition_text_from_file_metadata_date(mocker: MockerFixture):
mocker, filename = os.path.join(EXAMPLE_DOCS_DIRECTORY, "fake-text.txt")
filename="example-docs/fake-text.txt",
):
mocked_last_modification_date = "2029-07-05T09:24:28" mocked_last_modification_date = "2029-07-05T09:24:28"
mocker.patch( mocker.patch(
@ -437,10 +452,8 @@ def test_partition_text_from_file_metadata_date(
assert elements[0].metadata.last_modified == mocked_last_modification_date assert elements[0].metadata.last_modified == mocked_last_modification_date
def test_partition_text_from_file_with_custom_metadata_date( def test_partition_text_from_file_with_custom_metadata_date(mocker: MockerFixture):
mocker, filename = os.path.join(EXAMPLE_DOCS_DIRECTORY, "fake-text.txt")
filename="example-docs/fake-text.txt",
):
mocked_last_modification_date = "2029-07-05T09:24:28" mocked_last_modification_date = "2029-07-05T09:24:28"
expected_last_modification_date = "2020-07-05T09:24:28" expected_last_modification_date = "2020-07-05T09:24:28"
@ -455,9 +468,8 @@ def test_partition_text_from_file_with_custom_metadata_date(
assert elements[0].metadata.last_modified == expected_last_modification_date assert elements[0].metadata.last_modified == expected_last_modification_date
def test_partition_text_from_text_metadata_date( def test_partition_text_from_text_metadata_date():
filename="example-docs/fake-text.txt", filename = os.path.join(EXAMPLE_DOCS_DIRECTORY, "fake-text.txt")
):
with open(filename) as f: with open(filename) as f:
text = f.read() text = f.read()
@ -467,9 +479,8 @@ def test_partition_text_from_text_metadata_date(
assert elements[0].metadata.last_modified is None assert elements[0].metadata.last_modified is None
def test_partition_text_from_text_with_custom_metadata_date( def test_partition_text_from_text_with_custom_metadata_date():
filename="example-docs/fake-text.txt", filename = os.path.join(EXAMPLE_DOCS_DIRECTORY, "fake-text.txt")
):
expected_last_modification_date = "2020-07-05T09:24:28" expected_last_modification_date = "2020-07-05T09:24:28"
with open(filename) as f: with open(filename) as f:
@ -487,8 +498,10 @@ def test_partition_text_with_unique_ids():
json.dumps(elements[0].to_dict()) json.dumps(elements[0].to_dict())
elements = partition_text(text="hello there!", unique_element_ids=True) elements = partition_text(text="hello there!", unique_element_ids=True)
assert len(elements[0].id) == 36 id = elements[0].id
assert elements[0].id.count("-") == 4 assert isinstance(id, str) # included for type-narrowing
assert len(id) == 36
assert id.count("-") == 4
# Test that the element is JSON serializable. This should run without an error # Test that the element is JSON serializable. This should run without an error
json.dumps(elements[0].to_dict()) json.dumps(elements[0].to_dict())
@ -506,7 +519,8 @@ def test_partition_text_with_json(file_name: str, encoding: str | None):
assert_round_trips_through_JSON(elements) assert_round_trips_through_JSON(elements)
def test_add_chunking_strategy_on_partition_text(filename="example-docs/norwich-city.txt"): def test_add_chunking_strategy_on_partition_text():
filename = os.path.join(EXAMPLE_DOCS_DIRECTORY, "norwich-city.txt")
elements = partition_text(filename=filename) elements = partition_text(filename=filename)
chunk_elements = partition_text(filename, chunking_strategy="by_title") chunk_elements = partition_text(filename, chunking_strategy="by_title")
chunks = chunk_by_title(elements) chunks = chunk_by_title(elements)
@ -515,32 +529,32 @@ def test_add_chunking_strategy_on_partition_text(filename="example-docs/norwich-
def test_partition_text_element_metadata_has_languages(): def test_partition_text_element_metadata_has_languages():
filename = "example-docs/norwich-city.txt" filename = os.path.join(EXAMPLE_DOCS_DIRECTORY, "norwich-city.txt")
elements = partition_text(filename=filename) elements = partition_text(filename=filename)
assert elements[0].metadata.languages == ["eng"] assert elements[0].metadata.languages == ["eng"]
def test_partition_text_respects_detect_language_per_element(): def test_partition_text_respects_detect_language_per_element():
filename = "example-docs/language-docs/eng_spa_mult.txt" filename = os.path.join(EXAMPLE_DOCS_DIRECTORY, "language-docs", "eng_spa_mult.txt")
elements = partition_text(filename=filename, detect_language_per_element=True) elements = partition_text(filename=filename, detect_language_per_element=True)
langs = [element.metadata.languages for element in elements] langs = [element.metadata.languages for element in elements]
assert langs == [["eng"], ["spa", "eng"], ["eng"], ["eng"], ["spa"]] assert langs == [["eng"], ["spa", "eng"], ["eng"], ["eng"], ["spa"]]
def test_partition_text_respects_languages_arg(): def test_partition_text_respects_languages_arg():
filename = "example-docs/norwich-city.txt" filename = os.path.join(EXAMPLE_DOCS_DIRECTORY, "norwich-city.txt")
elements = partition_text(filename=filename, languages=["deu"]) elements = partition_text(filename=filename, languages=["deu"])
assert elements[0].metadata.languages == ["deu"] assert elements[0].metadata.languages == ["deu"]
def test_partition_text_element_metadata_raises_TypeError(): def test_partition_text_element_metadata_raises_TypeError():
with pytest.raises(TypeError): with pytest.raises(TypeError):
filename = "example-docs/norwich-city.txt" filename = os.path.join(EXAMPLE_DOCS_DIRECTORY, "norwich-city.txt")
partition_text(filename=filename, languages="eng") partition_text(filename=filename, languages="eng") # type: ignore
def test_partition_text_detects_more_than_3_languages(): def test_partition_text_detects_more_than_3_languages():
filename = "example-docs/language-docs/UDHR_first_article_all.txt" filename = os.path.join(EXAMPLE_DOCS_DIRECTORY, "language-docs", "UDHR_first_article_all.txt")
elements = partition_text(filename=filename, detect_language_per_element=True) elements = partition_text(filename=filename, detect_language_per_element=True)
langs = list( langs = list(
{element.metadata.languages[0] for element in elements if element.metadata.languages}, {element.metadata.languages[0] for element in elements if element.metadata.languages},

View File

@ -1 +1 @@
__version__ = "0.10.29-dev6" # pragma: no cover __version__ = "0.10.29-dev7" # pragma: no cover

View File

@ -1,3 +1,5 @@
from __future__ import annotations
import quopri import quopri
import re import re
import sys import sys
@ -132,8 +134,8 @@ def group_bullet_paragraph(paragraph: str) -> list:
def group_broken_paragraphs( def group_broken_paragraphs(
text: str, text: str,
line_split: re.Pattern = PARAGRAPH_PATTERN_RE, line_split: re.Pattern[str] = PARAGRAPH_PATTERN_RE,
paragraph_split: re.Pattern = DOUBLE_PARAGRAPH_PATTERN_RE, paragraph_split: re.Pattern[str] = DOUBLE_PARAGRAPH_PATTERN_RE,
) -> str: ) -> str:
"""Groups paragraphs that have line breaks for visual/formatting purposes. """Groups paragraphs that have line breaks for visual/formatting purposes.
For example: For example:
@ -174,7 +176,7 @@ def group_broken_paragraphs(
def new_line_grouper( def new_line_grouper(
text: str, text: str,
paragraph_split: re.Pattern = LINE_BREAK_RE, paragraph_split: re.Pattern[str] = LINE_BREAK_RE,
) -> str: ) -> str:
""" """
Concatenates text document that has one-line paragraph break pattern Concatenates text document that has one-line paragraph break pattern
@ -221,7 +223,7 @@ def blank_line_grouper(
def auto_paragraph_grouper( def auto_paragraph_grouper(
text: str, text: str,
line_split: re.Pattern = LINE_BREAK_RE, line_split: re.Pattern[str] = LINE_BREAK_RE,
max_line_count: int = 2000, max_line_count: int = 2000,
threshold: float = 0.1, threshold: float = 0.1,
) -> str: ) -> str:

View File

@ -1,7 +1,7 @@
import copy import copy
import re import re
import textwrap import textwrap
from typing import IO, Callable, List, Optional, Tuple from typing import IO, Any, Callable, List, Optional, Tuple
from unstructured.chunking.title import add_chunking_strategy from unstructured.chunking.title import add_chunking_strategy
from unstructured.cleaners.core import ( from unstructured.cleaners.core import (
@ -40,126 +40,6 @@ from unstructured.partition.text_type import (
) )
def split_by_paragraph(
file_text: str,
min_partition: Optional[int] = 0,
max_partition: Optional[int] = 1500,
) -> List[str]:
paragraphs = re.split(PARAGRAPH_PATTERN, file_text.strip())
split_paragraphs = []
for paragraph in paragraphs:
split_paragraphs.extend(
split_content_to_fit_max(
content=paragraph,
max_partition=max_partition,
),
)
combined_paragraphs = combine_paragraphs_less_than_min(
split_paragraphs=split_paragraphs,
max_partition=max_partition,
min_partition=min_partition,
)
return combined_paragraphs
def _split_in_half_at_breakpoint(
content: str,
breakpoint: str = " ",
) -> List[str]:
"""Splits a segment of content at the breakpoint closest to the middle"""
mid = len(content) // 2
for i in range(len(content) // 2):
if content[mid + i] == breakpoint:
mid += i
break
elif content[mid - i] == breakpoint:
mid += -i
break
return [content[:mid].rstrip(), content[mid:].lstrip()]
def _split_content_size_n(content: str, n: int) -> List[str]:
"""Splits a section of content into chunks that are at most
size n without breaking apart words."""
segments = []
if len(content) < n * 2:
segments = list(_split_in_half_at_breakpoint(content))
else:
segments = textwrap.wrap(content, width=n)
return segments
def split_content_to_fit_max(
content: str,
max_partition: Optional[int] = 1500,
) -> List[str]:
"""Splits a paragraph or section of content so that all of the elements fit into the
max partition window."""
sentences = sent_tokenize(content)
chunks = []
tmp_chunk = ""
for sentence in sentences:
if max_partition is not None and len(sentence) > max_partition:
if tmp_chunk:
chunks.append(tmp_chunk)
tmp_chunk = ""
segments = _split_content_size_n(sentence, n=max_partition)
chunks.extend(segments[:-1])
tmp_chunk = segments[-1]
else:
if max_partition is not None and len(tmp_chunk + " " + sentence) > max_partition:
chunks.append(tmp_chunk)
tmp_chunk = sentence
else:
if not tmp_chunk:
tmp_chunk = sentence
else:
tmp_chunk += " " + sentence
tmp_chunk = tmp_chunk.strip()
if tmp_chunk:
chunks.append(tmp_chunk)
return chunks
def combine_paragraphs_less_than_min(
split_paragraphs: List[str],
max_partition: Optional[int] = 1500,
min_partition: Optional[int] = 0,
) -> List[str]:
"""Combine paragraphs less than `min_partition` while not exceeding `max_partition`."""
min_partition = min_partition or 0
max_possible_partition = len(" ".join(split_paragraphs))
max_partition = max_partition or max_possible_partition
combined_paras = []
combined_idxs = []
for i, para in enumerate(split_paragraphs):
if i in combined_idxs:
continue
if len(para) >= min_partition:
combined_paras.append(para)
else:
combined_para = para
for j, next_para in enumerate(split_paragraphs[i + 1 :]): # noqa
if len(combined_para) + len(next_para) + 1 <= max_partition:
combined_idxs.append(i + j + 1)
combined_para += " " + next_para
else:
break
combined_paras.append(combined_para)
return combined_paras
@process_metadata()
@add_metadata_with_filetype(FileType.TXT)
@add_chunking_strategy()
def partition_text( def partition_text(
filename: Optional[str] = None, filename: Optional[str] = None,
file: Optional[IO[bytes]] = None, file: Optional[IO[bytes]] = None,
@ -175,7 +55,7 @@ def partition_text(
chunking_strategy: Optional[str] = None, chunking_strategy: Optional[str] = None,
detect_language_per_element: bool = False, detect_language_per_element: bool = False,
detection_origin: Optional[str] = "text", detection_origin: Optional[str] = "text",
**kwargs, **kwargs: Any,
) -> List[Element]: ) -> List[Element]:
"""Partitions an .txt documents into its constituent paragraph elements. """Partitions an .txt documents into its constituent paragraph elements.
If paragraphs are below "min_partition" or above "max_partition" boundaries, If paragraphs are below "min_partition" or above "max_partition" boundaries,
@ -185,7 +65,7 @@ def partition_text(
filename filename
A string defining the target filename path. A string defining the target filename path.
file file
A file-like object using "r" mode --> open(filename, "r"). A file-like object using "rb" mode --> open(filename, "rb").
text text
The string representation of the .txt document. The string representation of the .txt document.
encoding encoding
@ -210,6 +90,46 @@ def partition_text(
metadata_last_modified metadata_last_modified
The day of the last modification The day of the last modification
""" """
return _partition_text(
filename=filename,
file=file,
text=text,
encoding=encoding,
paragraph_grouper=paragraph_grouper,
metadata_filename=metadata_filename,
include_metadata=include_metadata,
languages=languages,
max_partition=max_partition,
min_partition=min_partition,
metadata_last_modified=metadata_last_modified,
chunking_strategy=chunking_strategy,
detect_language_per_element=detect_language_per_element,
detection_origin=detection_origin,
**kwargs,
)
@process_metadata()
@add_metadata_with_filetype(FileType.TXT)
@add_chunking_strategy()
def _partition_text(
filename: Optional[str] = None,
file: Optional[IO[bytes]] = None,
text: Optional[str] = None,
encoding: Optional[str] = None,
paragraph_grouper: Optional[Callable[[str], str]] = None,
metadata_filename: Optional[str] = None,
include_metadata: bool = True,
languages: Optional[List[str]] = ["auto"],
max_partition: Optional[int] = 1500,
min_partition: Optional[int] = 0,
metadata_last_modified: Optional[str] = None,
chunking_strategy: Optional[str] = None,
detect_language_per_element: bool = False,
detection_origin: Optional[str] = "text",
**kwargs: Any,
) -> List[Element]:
"""internal API for `partition_text`"""
if text is not None and text.strip() == "" and not file and not filename: if text is not None and text.strip() == "" and not file and not filename:
return [] return []
@ -222,6 +142,7 @@ def partition_text(
# Verify that only one of the arguments was provided # Verify that only one of the arguments was provided
exactly_one(filename=filename, file=file, text=text) exactly_one(filename=filename, file=file, text=text)
file_text = ""
last_modification_date = None last_modification_date = None
if filename is not None: if filename is not None:
@ -245,7 +166,7 @@ def partition_text(
if min_partition is not None and len(file_text) < min_partition: if min_partition is not None and len(file_text) < min_partition:
raise ValueError("`min_partition` cannot be larger than the length of file contents.") raise ValueError("`min_partition` cannot be larger than the length of file contents.")
file_content = split_by_paragraph( file_content = _split_by_paragraph(
file_text, file_text,
min_partition=min_partition, min_partition=min_partition,
max_partition=max_partition, max_partition=max_partition,
@ -323,3 +244,133 @@ def element_from_text(
coordinates=coordinates, coordinates=coordinates,
coordinate_system=coordinate_system, coordinate_system=coordinate_system,
) )
def _combine_paragraphs_less_than_min(
split_paragraphs: List[str],
max_partition: Optional[int] = 1500,
min_partition: Optional[int] = 0,
) -> List[str]:
"""Combine paragraphs less than `min_partition` while not exceeding `max_partition`."""
min_partition = min_partition or 0
max_possible_partition = len(" ".join(split_paragraphs))
max_partition = max_partition or max_possible_partition
combined_paras: List[str] = []
combined_idxs: List[int] = []
for i, para in enumerate(split_paragraphs):
if i in combined_idxs:
continue
# Paragraphs have already been split to fit `max_partition`, so they can be safely added
# to the final list of chunks if they are also greater than `min_partition`
if len(para) >= min_partition:
combined_paras.append(para)
else:
combined_para = para
for j, next_para in enumerate(split_paragraphs[i + 1 :]): # noqa
# Combine the current paragraph(s), e.g. `combined_para` with the next paragraph(s)
# as long as they don't exceed `max_partition`, and keep track of the indices
# that have been combined.
if len(combined_para) + len(next_para) + 1 <= max_partition:
combined_idxs.append(i + j + 1)
combined_para += " " + next_para
else:
break
combined_paras.append(combined_para)
return combined_paras
def _split_by_paragraph(
file_text: str,
min_partition: Optional[int] = 0,
max_partition: Optional[int] = 1500,
) -> List[str]:
"""Split text into paragraphs that fit within the `min_` and `max_partition` window."""
paragraphs = re.split(PARAGRAPH_PATTERN, file_text.strip())
split_paragraphs: List[str] = []
for paragraph in paragraphs:
split_paragraphs.extend(
_split_content_to_fit_max(
content=paragraph,
max_partition=max_partition,
),
)
combined_paragraphs = _combine_paragraphs_less_than_min(
split_paragraphs=split_paragraphs,
max_partition=max_partition,
min_partition=min_partition,
)
return combined_paragraphs
def _split_content_size_n(content: str, n: int) -> List[str]:
"""Splits a section of content into chunks that are at most
size n without breaking apart words."""
segments = []
if len(content) < n * 2:
segments = list(_split_in_half_at_breakpoint(content))
else:
segments = textwrap.wrap(content, width=n)
return segments
def _split_content_to_fit_max(
content: str,
max_partition: Optional[int] = 1500,
) -> List[str]:
"""Splits a paragraph or section of content so that all of the elements fit into the
max partition window."""
sentences = sent_tokenize(content)
chunks: List[str] = []
tmp_chunk = ""
# Initialize an empty string to collect sentence segments (`tmp_chunk`).
for sentence in sentences:
# If a single sentence is larger than `max_partition`, the sentence will be split by
# `_split_content_size_n` and the last segment of the original sentence will be used
# as the beginning of the next chunk.
if max_partition is not None and len(sentence) > max_partition:
if tmp_chunk:
chunks.append(tmp_chunk)
tmp_chunk = ""
segments = _split_content_size_n(sentence, n=max_partition)
chunks.extend(segments[:-1])
tmp_chunk = segments[-1]
else:
# If the current sentence is smaller than `max_partition`, but adding it to the
# current `tmp_chunk` would exceed `max_partition`, add the `tmp_chunk` to the
# final list of `chunks` and begin the next chunk with the current sentence.
if max_partition is not None and len(tmp_chunk + " " + sentence) > max_partition:
chunks.append(tmp_chunk)
tmp_chunk = sentence
else:
# Otherwise, the sentence can be added to `tmp_chunk`
if not tmp_chunk:
tmp_chunk = sentence
else:
tmp_chunk += " " + sentence
tmp_chunk = tmp_chunk.strip()
if tmp_chunk:
chunks.append(tmp_chunk)
return chunks
def _split_in_half_at_breakpoint(
content: str,
breakpoint: str = " ",
) -> List[str]:
"""Splits a segment of content at the breakpoint closest to the middle"""
mid = len(content) // 2
for i in range(len(content) // 2):
if content[mid + i] == breakpoint:
mid += i
break
elif content[mid - i] == breakpoint:
mid += -i
break
return [content[:mid].rstrip(), content[mid:].lstrip()]

View File

@ -311,6 +311,6 @@ def is_email_address(text: str) -> bool:
return EMAIL_ADDRESS_PATTERN_RE.match(text.strip()) is not None return EMAIL_ADDRESS_PATTERN_RE.match(text.strip()) is not None
def is_possible_numbered_list(text) -> bool: def is_possible_numbered_list(text: str) -> bool:
"""Checks to see if the text is a potential numbered list.""" """Checks to see if the text is a potential numbered list."""
return NUMBERED_LIST_RE.match(text.strip()) is not None return NUMBERED_LIST_RE.match(text.strip()) is not None