rfctr: prepare for fix to raises on file-like-object with name not a path to a file (#2617)

**Summary**
Improve typing and other mechanical refactoring in preparation for fix
to issue 2308.
This commit is contained in:
Steve Canny 2024-03-06 15:46:54 -08:00 committed by GitHub
parent 79552ff70b
commit b59e4b69ce
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
9 changed files with 90 additions and 101 deletions

View File

@ -1,4 +1,4 @@
## 0.12.6-dev5
## 0.12.6-dev6
### Enhancements

View File

@ -5,7 +5,8 @@ import pytest
from PIL import Image
from unstructured_inference.inference import layout
from unstructured_inference.inference.elements import TextRegion
from unstructured_inference.inference.layout import DocumentLayout, LayoutElement, PageLayout
from unstructured_inference.inference.layout import DocumentLayout, PageLayout
from unstructured_inference.inference.layoutelement import LayoutElement
from unstructured.documents.coordinates import PixelSpace
from unstructured.documents.elements import (
@ -28,7 +29,7 @@ from unstructured.partition.utils.constants import SORT_MODE_BASIC, SORT_MODE_DO
class MockPageLayout(layout.PageLayout):
def __init__(self, number: int, image: Image):
def __init__(self, number: int, image: Image.Image):
self.number = number
self.image = image

View File

@ -1 +1,11 @@
class Table: ...
class Table:
@property
def rows(self) -> tuple[_Row]: ...
class _Row:
@property
def cells(self) -> tuple[_Cell]: ...
class _Cell:
@property
def text(self) -> str: ...

View File

@ -1 +1 @@
__version__ = "0.12.6-dev5" # pragma: no cover
__version__ = "0.12.6-dev6" # pragma: no cover

View File

@ -899,7 +899,7 @@ class Footer(Text):
category = "Footer"
TYPE_TO_TEXT_ELEMENT_MAP: Dict[str, Any] = {
TYPE_TO_TEXT_ELEMENT_MAP: Dict[str, type[Text]] = {
ElementType.TITLE: Title,
ElementType.SECTION_HEADER: Title,
ElementType.HEADLINE: Title,

View File

@ -15,9 +15,9 @@ from unstructured.documents.elements import Element
from unstructured.file_utils.encoding import detect_file_encoding, format_encoding_str
from unstructured.nlp.patterns import LIST_OF_DICTS_PATTERN
from unstructured.partition.common import (
_add_element_metadata,
_remove_element_metadata,
add_element_metadata,
exactly_one,
remove_element_metadata,
set_element_hierarchy,
)
@ -602,16 +602,11 @@ def add_metadata(func: Callable[_P, List[Element]]) -> Callable[_P, List[Element
# NOTE(robinson) - Attached files have already run through this logic
# in their own partitioning function
if element.metadata.attached_to_filename is None:
_add_element_metadata(
element,
**metadata_kwargs, # type: ignore
)
add_element_metadata(element, **metadata_kwargs)
return elements
else:
return _remove_element_metadata(
elements,
)
return remove_element_metadata(elements)
return wrapper
@ -639,16 +634,11 @@ def add_filetype(
# NOTE(robinson) - Attached files have already run through this logic
# in their own partitioning function
if element.metadata.attached_to_filename is None:
_add_element_metadata(
element,
filetype=FILETYPE_TO_MIMETYPE[filetype],
)
add_element_metadata(element, filetype=FILETYPE_TO_MIMETYPE[filetype])
return elements
else:
return _remove_element_metadata(
elements,
)
return remove_element_metadata(elements)
return wrapper

View File

@ -6,17 +6,7 @@ import subprocess
from datetime import datetime
from io import BufferedReader, BytesIO, TextIOWrapper
from tempfile import SpooledTemporaryFile
from typing import (
IO,
TYPE_CHECKING,
Any,
BinaryIO,
Dict,
List,
Optional,
Tuple,
Union,
)
from typing import IO, TYPE_CHECKING, Any, BinaryIO, List, Optional
import emoji
from tabulate import tabulate
@ -39,11 +29,8 @@ from unstructured.nlp.patterns import ENUMERATED_BULLETS_RE, UNICODE_BULLETS_RE
from unstructured.partition.utils.constants import SORT_MODE_DONT, SORT_MODE_XY_CUT
from unstructured.utils import dependency_exists, first
if dependency_exists("docx") and dependency_exists("docx.table"):
from docx.table import Table as docxtable
if dependency_exists("pptx") and dependency_exists("pptx.table"):
from pptx.table import Table as pptxtable
from pptx.table import Table as PptxTable
if dependency_exists("numpy") and dependency_exists("cv2"):
from unstructured.partition.utils.sorting import sort_page_elements
@ -80,14 +67,20 @@ HIERARCHY_RULE_SET = {
}
def get_last_modified_date(filename: str) -> Union[str, None]:
def get_last_modified_date(filename: str) -> Optional[str]:
"""Modification time of file at path `filename`, if it exists.
Returns `None` when `filename` is not a path to a file on the local filesystem.
Otherwise returns date and time in ISO 8601 string format (YYYY-MM-DDTHH:MM:SS) like
"2024-03-05T17:02:53".
"""
modify_date = datetime.fromtimestamp(os.path.getmtime(filename))
return modify_date.strftime("%Y-%m-%dT%H:%M:%S%z")
def get_last_modified_date_from_file(
file: Union[IO[bytes], SpooledTemporaryFile[bytes], BinaryIO, bytes],
) -> Union[str, None]:
def get_last_modified_date_from_file(file: IO[bytes] | bytes) -> Optional[str]:
"""Modified timestamp of `file` if it corresponds to a file on the local filesystem."""
filename = None
if hasattr(file, "name"):
filename = file.name
@ -100,15 +93,11 @@ def get_last_modified_date_from_file(
def normalize_layout_element(
layout_element: Union[
"LayoutElement",
Element,
Dict[str, Any],
],
layout_element: LayoutElement | Element | dict[str, Any],
coordinate_system: Optional[CoordinateSystem] = None,
infer_list_items: bool = True,
source_format: Optional[str] = "html",
) -> Union[Element, List[Element]]:
) -> Element | list[Element]:
"""Converts an unstructured_inference LayoutElement object to an unstructured Element."""
if isinstance(layout_element, Element) and source_format == "html":
@ -123,7 +112,7 @@ def normalize_layout_element(
else:
layout_dict = layout_element
text = layout_dict.get("text")
text = layout_dict.get("text", "")
# Both `coordinates` and `coordinate_system` must be present
# in order to add coordinates metadata to the element.
coordinates = layout_dict.get("coordinates")
@ -148,7 +137,7 @@ def normalize_layout_element(
)
else:
return ListItem(
text=text if text else "",
text=text,
coordinates=coordinates,
coordinate_system=coordinate_system,
metadata=class_prob_metadata,
@ -156,6 +145,7 @@ def normalize_layout_element(
)
elif element_type in TYPE_TO_TEXT_ELEMENT_MAP:
assert isinstance(element_type, str)
_element_class = TYPE_TO_TEXT_ELEMENT_MAP[element_type]
_element_class = _element_class(
text=text,
@ -187,7 +177,7 @@ def normalize_layout_element(
)
else:
return Text(
text=text if text else "",
text=text,
coordinates=coordinates,
coordinate_system=coordinate_system,
metadata=class_prob_metadata,
@ -197,10 +187,10 @@ def normalize_layout_element(
def layout_list_to_list_items(
text: Optional[str],
coordinates: Optional[Tuple[Tuple[float, float], ...]],
coordinates: Optional[tuple[tuple[float, float], ...]],
coordinate_system: Optional[CoordinateSystem],
metadata=Optional[ElementMetadata],
detection_origin=Optional[str],
metadata: Optional[ElementMetadata],
detection_origin: Optional[str],
) -> List[Element]:
"""Converts a list LayoutElement to a list of ListItem elements."""
split_items = ENUMERATED_BULLETS_RE.split(text) if text else []
@ -226,9 +216,8 @@ def layout_list_to_list_items(
def set_element_hierarchy(
elements: List[Element],
ruleset: Dict[str, List[str]] = HIERARCHY_RULE_SET,
) -> List[Element]:
elements: List[Element], ruleset: dict[str, list[str]] = HIERARCHY_RULE_SET
) -> list[Element]:
"""Sets the parent_id for each element in the list of elements
based on the element's category, depth and a ruleset
@ -274,23 +263,25 @@ def set_element_hierarchy(
return elements
def _add_element_metadata(
def add_element_metadata(
element: Element,
filename: Optional[str] = None,
filetype: Optional[str] = None,
page_number: Optional[int] = None,
url: Optional[str] = None,
text_as_html: Optional[str] = None,
coordinates: Optional[Tuple[Tuple[float, float], ...]] = None,
coordinates: Optional[tuple[tuple[float, float], ...]] = None,
coordinate_system: Optional[CoordinateSystem] = None,
section: Optional[str] = None,
image_path: Optional[str] = None,
detection_origin: Optional[str] = None,
languages: Optional[List[str]] = None,
**kwargs,
**kwargs: Any,
) -> Element:
"""Adds document metadata to the document element. Document metadata includes information
like the filename, source url, and page number."""
"""Adds document metadata to the document element.
Document metadata includes information like the filename, source url, and page number.
"""
coordinates_metadata = (
CoordinatesMetadata(
@ -342,12 +333,11 @@ def _add_element_metadata(
return element
def _remove_element_metadata(
layout_elements,
) -> List[Element]:
"""Removes document metadata from the document element. Document metadata includes information
like the filename, source url, and page number."""
# Init an empty list of elements to write to
def remove_element_metadata(layout_elements) -> list[Element]:
"""Removes document metadata from the document element.
Document metadata includes information like the filename, source url, and page number.
"""
elements: List[Element] = []
metadata = ElementMetadata()
for layout_element in layout_elements:
@ -442,8 +432,8 @@ def exactly_one(**kwargs: Any) -> None:
def spooled_to_bytes_io_if_needed(
file_obj: Optional[Union[bytes, BinaryIO, SpooledTemporaryFile[bytes]]],
) -> Optional[Union[bytes, BinaryIO]]:
file_obj: bytes | BinaryIO | SpooledTemporaryFile[bytes] | None,
) -> bytes | BinaryIO | None:
if isinstance(file_obj, SpooledTemporaryFile):
file_obj.seek(0)
contents = file_obj.read()
@ -453,35 +443,35 @@ def spooled_to_bytes_io_if_needed(
return file_obj
def convert_to_bytes(
file: Optional[Union[bytes, SpooledTemporaryFile, IO[bytes]]] = None,
) -> bytes:
def convert_to_bytes(file: bytes | IO[bytes]) -> bytes:
"""Extract the bytes from `file` without preventing it from being read again later.
As a convenience to simplify client code, also returns `file` unchanged if it is already bytes.
"""
if isinstance(file, bytes):
f_bytes = file
elif isinstance(file, SpooledTemporaryFile):
return file
if isinstance(file, SpooledTemporaryFile):
file.seek(0)
f_bytes = file.read()
file.seek(0)
elif isinstance(file, BytesIO):
f_bytes = file.getvalue()
elif isinstance(file, (TextIOWrapper, BufferedReader)):
return f_bytes
if isinstance(file, BytesIO):
return file.getvalue()
if isinstance(file, (TextIOWrapper, BufferedReader)):
with open(file.name, "rb") as f:
f_bytes = f.read()
else:
raise ValueError("Invalid file-like object type")
return f.read()
return f_bytes
raise ValueError("Invalid file-like object type")
def convert_ms_office_table_to_text(
table: Union["docxtable", "pptxtable"],
as_html: bool = True,
) -> str:
"""
Convert a table object from a Word document to an HTML table string using the tabulate library.
def convert_ms_office_table_to_text(table: PptxTable, as_html: bool = True) -> str:
"""Convert a PPTX table object to an HTML table string using the tabulate library.
Args:
table (Table): A docx.table.Table object.
table (Table): A pptx.table.Table object.
as_html (bool): Whether to return the table as an HTML string (True) or a
plain text string (False)
@ -513,9 +503,7 @@ def contains_emoji(s: str) -> bool:
return bool(emoji.emoji_count(s))
def _get_page_image_metadata(
page: PageLayout,
) -> dict:
def _get_page_image_metadata(page: PageLayout) -> dict[str, Any]:
"""Retrieve image metadata and coordinate system from a page."""
image = getattr(page, "image", None)
@ -551,7 +539,7 @@ def document_to_element_list(
detection_origin: Optional[str] = None,
sort_mode: str = SORT_MODE_XY_CUT,
languages: Optional[List[str]] = None,
**kwargs,
**kwargs: Any,
) -> List[Element]:
"""Converts a DocumentLayout object to a list of unstructured elements."""
elements: List[Element] = []
@ -565,7 +553,7 @@ def document_to_element_list(
image_width = page_image_metadata.get("width")
image_height = page_image_metadata.get("height")
translation_mapping: List[Tuple["LayoutElement", Element]] = []
translation_mapping: list[tuple["LayoutElement", Element]] = []
for layout_element in page.elements:
if image_width and image_height and hasattr(layout_element.bbox, "coordinates"):
coordinate_system = PixelSpace(width=image_width, height=image_height)
@ -610,7 +598,7 @@ def document_to_element_list(
layout_element.image_path if hasattr(layout_element, "image_path") else None
)
_add_element_metadata(
add_element_metadata(
element,
page_number=i + 1,
filetype=image_format,
@ -642,16 +630,16 @@ def document_to_element_list(
def ocr_data_to_elements(
ocr_data: List["LayoutElement"],
image_size: Tuple[Union[int, float], Union[int, float]],
image_size: tuple[int | float, int | float],
common_metadata: Optional[ElementMetadata] = None,
infer_list_items: bool = True,
source_format: Optional[str] = None,
) -> List[Element]:
) -> list[Element]:
"""Convert OCR layout data into `unstructured` elements with associated metadata."""
image_width, image_height = image_size
coordinate_system = PixelSpace(width=image_width, height=image_height)
elements = []
elements: list[Element] = []
for layout_element in ocr_data:
element = normalize_layout_element(
layout_element,

View File

@ -135,7 +135,7 @@ def default_hi_res_model() -> str:
@add_chunking_strategy
def partition_pdf(
filename: str = "",
file: Optional[Union[BinaryIO, SpooledTemporaryFile]] = None,
file: Optional[Union[BinaryIO, SpooledTemporaryFile[bytes]]] = None,
include_page_breaks: bool = False,
strategy: str = PartitionStrategy.AUTO,
infer_table_structure: bool = False,
@ -151,7 +151,7 @@ def partition_pdf(
extract_image_block_types: Optional[List[str]] = None,
extract_image_block_output_dir: Optional[str] = None,
extract_image_block_to_payload: bool = False,
**kwargs,
**kwargs: Any,
) -> List[Element]:
"""Parses a pdf document into a list of interpreted elements.
Parameters

View File

@ -263,7 +263,7 @@ def validate_date_args(date: Optional[str] = None) -> bool:
)
def _first_and_remaining_iterator(it: Iterable[Any]) -> tuple[Any, Iterator[Any]]:
def _first_and_remaining_iterator(it: Iterable[_T]) -> Tuple[_T, Iterator[_T]]:
iterator = iter(it)
try:
out = next(iterator)
@ -275,7 +275,7 @@ def _first_and_remaining_iterator(it: Iterable[Any]) -> tuple[Any, Iterator[Any]
return out, iterator
def first(it: Iterable[Any]) -> Any:
def first(it: Iterable[_T]) -> _T:
"""Returns the first item from an iterable. Raises an error if the iterable is empty."""
out, _ = _first_and_remaining_iterator(it)
return out