fix: filetype detection if a CSV has a text/plain MIME type (#691)

* fix:  Filetype detection if a CSV has a text/plain MIME type #621

* bug: fix csv detection and create _read_file_start_for_type_check func

* fix: Make call to _is_text_file_a_csv from detect_filetype
This commit is contained in:
John 2023-06-08 15:21:07 -05:00 committed by GitHub
parent c1ba090c34
commit b2b92ea79d
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
6 changed files with 56 additions and 12 deletions

View File

@ -1,4 +1,4 @@
## 0.7.3-dev1
## 0.7.3-dev2
### Enhancements
@ -8,6 +8,7 @@
### Fixes
* Filetype detection if a CSV has a `text/plain` MIME type
* `convert_office_doc` no longers prints file conversion info messages to stdout.
* `partition_via_api` reflects the actual filetype for the file processed in the API.

View File

@ -9,6 +9,7 @@ from unstructured.file_utils import filetype
from unstructured.file_utils.filetype import (
FileType,
_is_code_mime_type,
_is_text_file_a_csv,
_is_text_file_a_json,
detect_filetype,
)
@ -368,7 +369,9 @@ def test_filetype_order():
@pytest.mark.parametrize(
("content", "expected"),
[
(b"d\xe2\x80", False),
(b"d\xe2\x80", False), # Invalid JSON
(b'[{"key": "value"}]', True), # Valid JSON
(b"", False), # Empty content
],
)
def test_is_text_file_a_json(content, expected):
@ -376,3 +379,19 @@ def test_is_text_file_a_json(content, expected):
with BytesIO(content) as f:
assert _is_text_file_a_json(file=f) == expected
@pytest.mark.parametrize(
("content", "expected"),
[
(b"d\xe2\x80", False), # Invalid CSV
(b'[{"key": "value"}]', False), # Invalid CSV
(b"column1,column2,column3\nvalue1,value2,value3\n", True), # Valid CSV
(b"", False), # Empty content
],
)
def test_is_text_file_a_csv(content, expected):
from io import BytesIO
with BytesIO(content) as f:
assert _is_text_file_a_csv(file=f) == expected

View File

@ -502,7 +502,6 @@ def test_auto_partition_works_with_unstructured_jsons():
def test_auto_partition_works_with_unstructured_jsons_from_file():
filename = os.path.join(EXAMPLE_DOCS_DIRECTORY, "spring-weather.html.json")
with open(filename, "rb") as f:
elements = partition(file=f, strategy="hi_res")
assert elements[0].text == "News Around NOAA"

View File

@ -1 +1 @@
__version__ = "0.7.3-dev1" # pragma: no cover
__version__ = "0.7.3-dev2" # pragma: no cover

View File

@ -260,6 +260,9 @@ def detect_filetype(
if _is_text_file_a_json(file=file, filename=filename):
return FileType.JSON
if _is_text_file_a_csv(file=file, filename=filename):
return FileType.CSV
if file and not extension and _check_eml_from_buffer(file=file) is True:
return FileType.EML
@ -327,14 +330,12 @@ def _detect_filetype_from_octet_stream(file: IO) -> FileType:
return FileType.UNK
def _is_text_file_a_json(
def _read_file_start_for_type_check(
filename: Optional[str] = None,
content_type: Optional[str] = None,
file: Optional[IO] = None,
):
"""Detects if a file that has a text/plain MIME type is a JSON file."""
) -> str:
"""Reads the start of the file and returns the text content."""
exactly_one(filename=filename, file=file)
if file is not None:
file.seek(0)
file_content = file.read(4096)
@ -343,13 +344,37 @@ def _is_text_file_a_json(
else:
file_text = file_content.decode(errors="ignore")
file.seek(0)
elif filename is not None:
if filename is not None:
with open(filename) as f:
file_text = f.read()
file_text = f.read(4096)
return file_text
def _is_text_file_a_json(
filename: Optional[str] = None,
file: Optional[IO] = None,
):
"""Detects if a file that has a text/plain MIME type is a JSON file."""
file_text = _read_file_start_for_type_check(file=file, filename=filename)
return re.match(LIST_OF_DICTS_PATTERN, file_text) is not None
def _is_text_file_a_csv(
filename: Optional[str] = None,
file: Optional[IO] = None,
):
"""Detects if a file that has a text/plain MIME type is a CSV file."""
file_text = _read_file_start_for_type_check(file=file, filename=filename)
lines = file_text.strip().splitlines()
if len(lines) < 2:
return False
lines = lines[: len(lines)] if len(lines) < 10 else lines[:10]
header = lines[0].split(",")
if any("," not in line for line in lines):
return False
return all(len(line.split(",")) == len(header) for line in lines[:-1])
def _check_eml_from_buffer(file: IO) -> bool:
"""Checks if a text/plain file is actually a .eml file. Uses a regex pattern to see if the
start of the file matches the typical pattern for a .eml file."""
@ -359,7 +384,6 @@ def _check_eml_from_buffer(file: IO) -> bool:
file_head = file_content.decode("utf-8", errors="ignore")
else:
file_head = file_content
return EMAIL_HEAD_RE.match(file_head) is not None

View File

@ -105,3 +105,4 @@ ENDS_IN_PUNCT_RE = re.compile(ENDS_IN_PUNCT_PATTERN)
# NOTE(robinson) - Used to detect if text is in the expected "list of dicts"
# format for document elements
LIST_OF_DICTS_PATTERN = r"\A\s*\[\s*{?"
JSON_PATTERN = r"^(?:\{.*\}|\[.*\])$"