fix: handle encoding for text file checks (#707)

* fixed encoding issue for _is_text_file_a_json

* changelog and version
This commit is contained in:
Matt Robinson 2023-06-09 11:08:16 -04:00 committed by GitHub
parent b2b92ea79d
commit 0289ca3ea7
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
5 changed files with 35 additions and 9 deletions

View File

@ -1,4 +1,4 @@
## 0.7.3-dev2
## 0.7.3-dev3
### Enhancements

View File

@ -395,3 +395,16 @@ def test_is_text_file_a_csv(content, expected):
with BytesIO(content) as f:
assert _is_text_file_a_csv(file=f) == expected
def test_csv_json_check_with_filename_and_utf_32(filename="example-docs/fake-text-utf-32.txt"):
assert _is_text_file_a_csv(filename=filename) is False
assert _is_text_file_a_json(filename=filename) is False
def test_csv_json_check_with_file_and_utf_32(filename="example-docs/fake-text-utf-32.txt"):
with open(filename, "rb") as f:
assert _is_text_file_a_csv(file=f) is False
with open(filename, "rb") as f:
assert _is_text_file_a_json(file=f) is False

View File

@ -1 +1 @@
__version__ = "0.7.3-dev2" # pragma: no cover
__version__ = "0.7.3-dev3" # pragma: no cover

View File

@ -7,6 +7,7 @@ from functools import wraps
from typing import IO, Callable, List, Optional
from unstructured.documents.elements import Element, PageBreak
from unstructured.file_utils.encoding import detect_file_encoding
from unstructured.nlp.patterns import LIST_OF_DICTS_PATTERN
from unstructured.partition.common import (
_add_element_metadata,
@ -190,6 +191,7 @@ def detect_filetype(
content_type: Optional[str] = None,
file: Optional[IO] = None,
file_filename: Optional[str] = None,
encoding: Optional[str] = "utf-8",
) -> Optional[FileType]:
"""Use libmagic to determine a file's type. Helps determine which partition brick
to use for a given file. A return value of None indicates a non-supported file type.
@ -257,10 +259,10 @@ def detect_filetype(
elif extension and extension == ".html":
return FileType.HTML
if _is_text_file_a_json(file=file, filename=filename):
if _is_text_file_a_json(file=file, filename=filename, encoding=encoding):
return FileType.JSON
if _is_text_file_a_csv(file=file, filename=filename):
if _is_text_file_a_csv(file=file, filename=filename, encoding=encoding):
return FileType.CSV
if file and not extension and _check_eml_from_buffer(file=file) is True:
@ -333,6 +335,7 @@ def _detect_filetype_from_octet_stream(file: IO) -> FileType:
def _read_file_start_for_type_check(
filename: Optional[str] = None,
file: Optional[IO] = None,
encoding: Optional[str] = "utf-8",
) -> str:
"""Reads the start of the file and returns the text content."""
exactly_one(filename=filename, file=file)
@ -345,26 +348,33 @@ def _read_file_start_for_type_check(
file_text = file_content.decode(errors="ignore")
file.seek(0)
if filename is not None:
with open(filename) as f:
file_text = f.read(4096)
try:
with open(filename, encoding=encoding) as f:
file_text = f.read(4096)
except UnicodeDecodeError:
encoding, _ = detect_file_encoding(filename=filename)
with open(filename, encoding=encoding) as f:
file_text = f.read(4096)
return file_text
def _is_text_file_a_json(
filename: Optional[str] = None,
file: Optional[IO] = None,
encoding: Optional[str] = "utf-8",
):
"""Detects if a file that has a text/plain MIME type is a JSON file."""
file_text = _read_file_start_for_type_check(file=file, filename=filename)
file_text = _read_file_start_for_type_check(file=file, filename=filename, encoding=encoding)
return re.match(LIST_OF_DICTS_PATTERN, file_text) is not None
def _is_text_file_a_csv(
filename: Optional[str] = None,
file: Optional[IO] = None,
encoding: Optional[str] = "utf-8",
):
"""Detects if a file that has a text/plain MIME type is a CSV file."""
file_text = _read_file_start_for_type_check(file=file, filename=filename)
file_text = _read_file_start_for_type_check(file=file, filename=filename, encoding=encoding)
lines = file_text.strip().splitlines()
if len(lines) < 2:
return False

View File

@ -112,6 +112,7 @@ def partition(
file=file,
file_filename=file_filename,
content_type=content_type,
encoding=encoding,
)
if file is not None:
@ -230,5 +231,7 @@ def file_and_type_from_url(
file = io.BytesIO(response.content)
content_type = content_type or response.headers.get("Content-Type")
filetype = detect_filetype(file=file, content_type=content_type)
encoding = response.headers.get("Content-Encoding", "utf-8")
filetype = detect_filetype(file=file, content_type=content_type, encoding=encoding)
return file, filetype