mirror of
https://github.com/Unstructured-IO/unstructured.git
synced 2025-06-27 02:30:08 +00:00
rfctr(file): refactor detect_filetype() (#3429)
**Summary** In preparation for fixing a cluster of bugs with automatic file-type detection and paving the way for some reliability improvements, refactor `unstructured.file_utils.filetype` module and improve thoroughness of tests. **Additional Context** Factor type-recognition process into three distinct strategies that are attempted in sequence. Attempted in order of preference, type-recognition falls to the next strategy when the one before it is not applicable or cannot determine the file-type. This provides a clear basis for organizing the code and tests at the top level. Consolidate the existing tests around these strategies, adding additional cases to achieve better coverage. Several bugs were uncovered in the process. Small ones were just fixed, bigger ones will be remedied in following PRs.
This commit is contained in:
parent
441b3393b1
commit
3fe5c094fa
@ -1,4 +1,4 @@
|
||||
## 0.15.1-dev1
|
||||
## 0.15.1-dev2
|
||||
|
||||
### Enhancements
|
||||
|
||||
@ -7,6 +7,10 @@
|
||||
### Fixes
|
||||
|
||||
* **Update import of Pinecone exception** Adds compatibility for pinecone-client>=5.0.0
|
||||
* **File-type detection catches non-existent file-path.** `detect_filetype()` no longer silently falls back to detecting a file-type based on the extension when no file exists at the path provided. Instead `FileNotFoundError` is raised. This provides consistent user notification of a mis-typed path rather than an unpredictable exception from a file-type specific partitioner when the file cannot be opened.
|
||||
* **EML files specified as a file-path are detected correctly.** Resolved a bug where an EML file submitted to `partition()` as a file-path was identified as TXT and partitioned using `partition_text()`. EML files specified by path are now identified and processed correctly, including processing any attachments.
|
||||
* **A DOCX, PPTX, or XLSX file specified by path and ambiguously identified as MIME-type "application/octet-stream" is identified correctly.** Resolves a shortcoming where a file specified by path immediately fell back to filename-extension based identification when misidentified as "application/octet-stream", either by asserted content type or a mis-guess by libmagic. An MS Office file misidentified in this way is now correctly identified regardless of its filename and whether it is specified by path or file-like object.
|
||||
* **Textual content retrieved from a URL with gzip transport compression now partitions correctly.** Resolves a bug where a textual file-type (such as Markdown) retrieved by passing a URL to `partition()` would raise when `gzip` compression was used for transport by the server.
|
||||
|
||||
## 0.15.0
|
||||
|
||||
|
BIN
example-docs/simple.pptx
Normal file
BIN
example-docs/simple.pptx
Normal file
Binary file not shown.
File diff suppressed because it is too large
Load Diff
@ -10,6 +10,14 @@ from unstructured.file_utils.model import FileType
|
||||
class DescribeFileType:
|
||||
"""Unit-test suite for `unstructured.file_utils.model.Filetype`."""
|
||||
|
||||
# -- .__lt__() ----------------------------------------------
|
||||
|
||||
def it_is_a_collection_ordered_by_name_and_can_be_sorted(self):
|
||||
"""FileType is a total order on name, e.g. FileType.A < FileType.B."""
|
||||
assert FileType.EML < FileType.HTML < FileType.XML
|
||||
|
||||
# -- .from_extension() --------------------------------------
|
||||
|
||||
@pytest.mark.parametrize(
|
||||
("ext", "file_type"),
|
||||
[
|
||||
@ -23,10 +31,12 @@ class DescribeFileType:
|
||||
def it_can_recognize_a_file_type_from_an_extension(self, ext: str, file_type: FileType | None):
|
||||
assert FileType.from_extension(ext) is file_type
|
||||
|
||||
@pytest.mark.parametrize("ext", [".foobar", ".xyz", ".mdx", "", "."])
|
||||
def but_not_when_that_extension_is_empty_or_not_registered(self, ext: str):
|
||||
@pytest.mark.parametrize("ext", [".foobar", ".xyz", ".mdx", "", ".", None])
|
||||
def but_not_when_that_extension_is_empty_or_None_or_not_registered(self, ext: str | None):
|
||||
assert FileType.from_extension(ext) is None
|
||||
|
||||
# -- .from_mime_type() --------------------------------------
|
||||
|
||||
@pytest.mark.parametrize(
|
||||
("mime_type", "file_type"),
|
||||
[
|
||||
@ -46,29 +56,13 @@ class DescribeFileType:
|
||||
):
|
||||
assert FileType.from_mime_type(mime_type) is file_type
|
||||
|
||||
@pytest.mark.parametrize("mime_type", ["text/css", "image/gif", "audio/mpeg", "foo/bar"])
|
||||
def but_not_when_that_mime_type_is_not_registered_by_a_file_type(self, mime_type: str):
|
||||
@pytest.mark.parametrize("mime_type", ["text/css", "image/gif", "audio/mpeg", "foo/bar", None])
|
||||
def but_not_when_that_mime_type_is_not_registered_by_a_file_type_or_None(
|
||||
self, mime_type: str | None
|
||||
):
|
||||
assert FileType.from_mime_type(mime_type) is None
|
||||
|
||||
@pytest.mark.parametrize(
|
||||
("file_type", "expected_value"),
|
||||
[
|
||||
(FileType.BMP, ("unstructured_inference",)),
|
||||
(FileType.CSV, ("pandas",)),
|
||||
(FileType.DOC, ("docx",)),
|
||||
(FileType.EMPTY, ()),
|
||||
(FileType.HTML, ()),
|
||||
(FileType.ODT, ("docx", "pypandoc")),
|
||||
(FileType.PDF, ("pdf2image", "pdfminer", "PIL")),
|
||||
(FileType.UNK, ()),
|
||||
(FileType.WAV, ()),
|
||||
(FileType.ZIP, ()),
|
||||
],
|
||||
)
|
||||
def it_knows_which_importable_packages_its_partitioner_depends_on(
|
||||
self, file_type: FileType, expected_value: tuple[str, ...]
|
||||
):
|
||||
assert file_type.importable_package_dependencies == expected_value
|
||||
# -- .extra_name --------------------------------------------
|
||||
|
||||
@pytest.mark.parametrize(
|
||||
("file_type", "expected_value"),
|
||||
@ -91,6 +85,30 @@ class DescribeFileType:
|
||||
):
|
||||
assert file_type.extra_name == expected_value
|
||||
|
||||
# -- .importable_package_dependencies -----------------------
|
||||
|
||||
@pytest.mark.parametrize(
|
||||
("file_type", "expected_value"),
|
||||
[
|
||||
(FileType.BMP, ("unstructured_inference",)),
|
||||
(FileType.CSV, ("pandas",)),
|
||||
(FileType.DOC, ("docx",)),
|
||||
(FileType.EMPTY, ()),
|
||||
(FileType.HTML, ()),
|
||||
(FileType.ODT, ("docx", "pypandoc")),
|
||||
(FileType.PDF, ("pdf2image", "pdfminer", "PIL")),
|
||||
(FileType.UNK, ()),
|
||||
(FileType.WAV, ()),
|
||||
(FileType.ZIP, ()),
|
||||
],
|
||||
)
|
||||
def it_knows_which_importable_packages_its_partitioner_depends_on(
|
||||
self, file_type: FileType, expected_value: tuple[str, ...]
|
||||
):
|
||||
assert file_type.importable_package_dependencies == expected_value
|
||||
|
||||
# -- .is_partitionable --------------------------------------
|
||||
|
||||
@pytest.mark.parametrize(
|
||||
("file_type", "expected_value"),
|
||||
[
|
||||
@ -112,6 +130,8 @@ class DescribeFileType:
|
||||
):
|
||||
assert file_type.is_partitionable is expected_value
|
||||
|
||||
# -- .mime_type ---------------------------------------------
|
||||
|
||||
@pytest.mark.parametrize(
|
||||
("file_type", "mime_type"),
|
||||
[
|
||||
@ -131,6 +151,8 @@ class DescribeFileType:
|
||||
def it_knows_its_canonical_MIME_type(self, file_type: FileType, mime_type: str):
|
||||
assert file_type.mime_type == mime_type
|
||||
|
||||
# -- .partitioner_function_name -----------------------------
|
||||
|
||||
@pytest.mark.parametrize(
|
||||
("file_type", "expected_value"),
|
||||
[
|
||||
@ -155,6 +177,8 @@ class DescribeFileType:
|
||||
with pytest.raises(ValueError, match="`.partitioner_function_name` is undefined because "):
|
||||
file_type.partitioner_function_name
|
||||
|
||||
# -- .partitioner_module_qname ------------------------------
|
||||
|
||||
@pytest.mark.parametrize(
|
||||
("file_type", "expected_value"),
|
||||
[
|
||||
@ -181,6 +205,8 @@ class DescribeFileType:
|
||||
with pytest.raises(ValueError, match="`.partitioner_module_qname` is undefined because "):
|
||||
file_type.partitioner_module_qname
|
||||
|
||||
# -- .partitioner_shortname ---------------------------------
|
||||
|
||||
@pytest.mark.parametrize(
|
||||
("file_type", "expected_value"),
|
||||
[
|
||||
|
@ -1,6 +1,10 @@
|
||||
from __future__ import annotations
|
||||
|
||||
import pytest
|
||||
|
||||
from test_unstructured.unit_utils import example_doc_path
|
||||
from unstructured.metrics.element_type import (
|
||||
FrequencyDict,
|
||||
calculate_element_type_percent_match,
|
||||
get_element_type_frequency,
|
||||
)
|
||||
@ -14,10 +18,9 @@ from unstructured.staging.base import elements_to_json
|
||||
(
|
||||
"fake-email.txt",
|
||||
{
|
||||
("UncategorizedText", None): 6,
|
||||
("NarrativeText", None): 1,
|
||||
("Title", None): 1,
|
||||
("ListItem", None): 2,
|
||||
("Title", None): 5,
|
||||
("NarrativeText", None): 2,
|
||||
},
|
||||
),
|
||||
(
|
||||
@ -34,8 +37,8 @@ from unstructured.staging.base import elements_to_json
|
||||
),
|
||||
],
|
||||
)
|
||||
def test_get_element_type_frequency(filename, frequency):
|
||||
elements = partition(filename=f"example-docs/{filename}")
|
||||
def test_get_element_type_frequency(filename: str, frequency: dict[tuple[str, int | None], int]):
|
||||
elements = partition(example_doc_path(filename))
|
||||
elements_freq = get_element_type_frequency(elements_to_json(elements))
|
||||
assert elements_freq == frequency
|
||||
|
||||
@ -46,11 +49,11 @@ def test_get_element_type_frequency(filename, frequency):
|
||||
(
|
||||
"fake-email.txt",
|
||||
{
|
||||
("UncategorizedText", None): 14,
|
||||
("Title", None): 1,
|
||||
("ListItem", None): 2,
|
||||
("NarrativeText", None): 2,
|
||||
},
|
||||
(0.56, 0.56, 0.56),
|
||||
(0.8, 0.8, 0.80),
|
||||
),
|
||||
(
|
||||
"sample-presentation.pptx",
|
||||
@ -92,8 +95,10 @@ def test_get_element_type_frequency(filename, frequency):
|
||||
),
|
||||
],
|
||||
)
|
||||
def test_calculate_element_type_percent_match(filename, expected_frequency, percent_matched):
|
||||
elements = partition(filename=f"example-docs/{filename}")
|
||||
def test_calculate_element_type_percent_match(
|
||||
filename: str, expected_frequency: FrequencyDict, percent_matched: tuple[float, float, float]
|
||||
):
|
||||
elements = partition(example_doc_path(filename))
|
||||
elements_frequency = get_element_type_frequency(elements_to_json(elements))
|
||||
assert (
|
||||
round(calculate_element_type_percent_match(elements_frequency, expected_frequency), 2)
|
||||
|
@ -929,7 +929,11 @@ def test_auto_partition_raises_with_bad_type(request: FixtureRequest):
|
||||
partition(filename="made-up.fake", strategy=PartitionStrategy.HI_RES)
|
||||
|
||||
detect_filetype_.assert_called_once_with(
|
||||
content_type=None, encoding=None, file=None, file_filename=None, filename="made-up.fake"
|
||||
file_path="made-up.fake",
|
||||
file=None,
|
||||
encoding=None,
|
||||
content_type=None,
|
||||
metadata_file_path=None,
|
||||
)
|
||||
|
||||
|
||||
@ -1305,7 +1309,7 @@ def test_auto_partition_that_requires_extras_raises_when_dependencies_are_not_in
|
||||
)
|
||||
match = r"partition_pdf\(\) is not available because one or more dependencies are not installed"
|
||||
with pytest.raises(ImportError, match=match):
|
||||
partition(example_doc_path("layout-parser-paper-fast.pdf"))
|
||||
partition(example_doc_path("pdf/layout-parser-paper-fast.pdf"))
|
||||
|
||||
dependency_exists_.assert_called_once_with("pdf2image")
|
||||
|
||||
|
@ -9,8 +9,8 @@ import tempfile
|
||||
import pytest
|
||||
from pytest_mock import MockFixture
|
||||
|
||||
from test_unstructured.unit_utils import example_doc_path
|
||||
from unstructured.documents.elements import CompositeElement
|
||||
from unstructured.file_utils.filetype import detect_filetype
|
||||
from unstructured.file_utils.model import FileType
|
||||
from unstructured.partition.email import partition_email
|
||||
from unstructured.partition.html import partition_html
|
||||
@ -43,9 +43,9 @@ def test_it_chunks_elements_when_a_chunking_strategy_is_specified():
|
||||
|
||||
@pytest.mark.parametrize("filename", test_files)
|
||||
def test_partition_json_from_filename(filename: str):
|
||||
path = os.path.join(DIRECTORY, "..", "..", "example-docs", filename)
|
||||
path = example_doc_path(filename)
|
||||
elements = []
|
||||
filetype = detect_filetype(filename=path)
|
||||
filetype = FileType.from_extension(os.path.splitext(path)[1])
|
||||
if filetype == FileType.TXT:
|
||||
elements = partition_text(filename=path)
|
||||
if filetype == FileType.HTML:
|
||||
@ -72,9 +72,9 @@ def test_partition_json_from_filename(filename: str):
|
||||
|
||||
@pytest.mark.parametrize("filename", test_files)
|
||||
def test_partition_json_from_filename_with_metadata_filename(filename: str):
|
||||
path = os.path.join(DIRECTORY, "..", "..", "example-docs", filename)
|
||||
path = example_doc_path(filename)
|
||||
elements = []
|
||||
filetype = detect_filetype(filename=path)
|
||||
filetype = FileType.from_extension(os.path.splitext(path)[1])
|
||||
if filetype == FileType.TXT:
|
||||
elements = partition_text(filename=path)
|
||||
if filetype == FileType.HTML:
|
||||
@ -97,9 +97,9 @@ def test_partition_json_from_filename_with_metadata_filename(filename: str):
|
||||
|
||||
@pytest.mark.parametrize("filename", test_files)
|
||||
def test_partition_json_from_file(filename: str):
|
||||
path = os.path.join(DIRECTORY, "..", "..", "example-docs", filename)
|
||||
path = example_doc_path(filename)
|
||||
elements = []
|
||||
filetype = detect_filetype(filename=path)
|
||||
filetype = FileType.from_extension(os.path.splitext(path)[1])
|
||||
if filetype == FileType.TXT:
|
||||
elements = partition_text(filename=path)
|
||||
if filetype == FileType.HTML:
|
||||
@ -126,9 +126,9 @@ def test_partition_json_from_file(filename: str):
|
||||
|
||||
@pytest.mark.parametrize("filename", test_files)
|
||||
def test_partition_json_from_file_with_metadata_filename(filename: str):
|
||||
path = os.path.join(DIRECTORY, "..", "..", "example-docs", filename)
|
||||
path = example_doc_path(filename)
|
||||
elements = []
|
||||
filetype = detect_filetype(filename=path)
|
||||
filetype = FileType.from_extension(os.path.splitext(path)[1])
|
||||
if filetype == FileType.TXT:
|
||||
elements = partition_text(filename=path)
|
||||
if filetype == FileType.HTML:
|
||||
@ -150,9 +150,9 @@ def test_partition_json_from_file_with_metadata_filename(filename: str):
|
||||
|
||||
@pytest.mark.parametrize("filename", test_files)
|
||||
def test_partition_json_from_text(filename: str):
|
||||
path = os.path.join(DIRECTORY, "..", "..", "example-docs", filename)
|
||||
path = example_doc_path(filename)
|
||||
elements = []
|
||||
filetype = detect_filetype(filename=path)
|
||||
filetype = FileType.from_extension(os.path.splitext(path)[1])
|
||||
if filetype == FileType.TXT:
|
||||
elements = partition_text(filename=path)
|
||||
if filetype == FileType.HTML:
|
||||
@ -192,9 +192,9 @@ def test_partition_json_works_with_empty_list():
|
||||
|
||||
|
||||
def test_partition_json_raises_with_too_many_specified():
|
||||
path = os.path.join(DIRECTORY, "..", "..", "example-docs", "fake-text.txt")
|
||||
path = example_doc_path("fake-text.txt")
|
||||
elements = []
|
||||
filetype = detect_filetype(filename=path)
|
||||
filetype = FileType.from_extension(os.path.splitext(path)[1])
|
||||
if filetype == FileType.TXT:
|
||||
elements = partition_text(filename=path)
|
||||
if filetype == FileType.HTML:
|
||||
@ -225,9 +225,9 @@ def test_partition_json_raises_with_too_many_specified():
|
||||
|
||||
@pytest.mark.parametrize("filename", test_files)
|
||||
def test_partition_json_from_filename_exclude_metadata(filename: str):
|
||||
path = os.path.join(DIRECTORY, "..", "..", "example-docs", filename)
|
||||
path = example_doc_path(filename)
|
||||
elements = []
|
||||
filetype = detect_filetype(filename=path)
|
||||
filetype = FileType.from_extension(os.path.splitext(path)[1])
|
||||
if filetype == FileType.TXT:
|
||||
elements = partition_text(filename=path)
|
||||
if filetype == FileType.HTML:
|
||||
@ -249,9 +249,9 @@ def test_partition_json_from_filename_exclude_metadata(filename: str):
|
||||
|
||||
@pytest.mark.parametrize("filename", test_files)
|
||||
def test_partition_json_from_file_exclude_metadata(filename: str):
|
||||
path = os.path.join(DIRECTORY, "..", "..", "example-docs", filename)
|
||||
path = example_doc_path(filename)
|
||||
elements = []
|
||||
filetype = detect_filetype(filename=path)
|
||||
filetype = FileType.from_extension(os.path.splitext(path)[1])
|
||||
if filetype == FileType.TXT:
|
||||
elements = partition_text(filename=path)
|
||||
if filetype == FileType.HTML:
|
||||
@ -274,9 +274,9 @@ def test_partition_json_from_file_exclude_metadata(filename: str):
|
||||
|
||||
@pytest.mark.parametrize("filename", test_files)
|
||||
def test_partition_json_from_text_exclude_metadata(filename: str):
|
||||
path = os.path.join(DIRECTORY, "..", "..", "example-docs", filename)
|
||||
path = example_doc_path(filename)
|
||||
elements = []
|
||||
filetype = detect_filetype(filename=path)
|
||||
filetype = FileType.from_extension(os.path.splitext(path)[1])
|
||||
if filetype == FileType.TXT:
|
||||
elements = partition_text(filename=path)
|
||||
if filetype == FileType.HTML:
|
||||
|
@ -1 +1 @@
|
||||
__version__ = "0.15.1-dev1" # pragma: no cover
|
||||
__version__ = "0.15.1-dev2" # pragma: no cover
|
||||
|
@ -1,18 +1,48 @@
|
||||
"""Automatically detect file-type based on inspection of the file's contents.
|
||||
|
||||
Auto-detection proceeds via a sequence of strategies. The first strategy to confidently determine a
|
||||
file-type returns that value. A strategy that is not applicable, either because it lacks the input
|
||||
required or fails to determine a file-type, returns `None` and execution continues with the next
|
||||
strategy.
|
||||
|
||||
`_FileTypeDetector` is the main object and implements the three strategies.
|
||||
|
||||
The three strategies are:
|
||||
|
||||
- Use MIME-type asserted by caller in the `content_type` argument.
|
||||
- Guess a MIME-type using libmagic, falling back to the `filetype` package when libmagic is
|
||||
unavailable.
|
||||
- Map filename-extension to a `FileType` member.
|
||||
|
||||
A file that fails all three strategies is assigned the value `FileType.UNK`, for "unknown".
|
||||
|
||||
`_FileTypeDetectionContext` encapsulates the various arguments received by `detect_filetype()` and
|
||||
provides values derived from them. This object is immutable and can be passed to delegates of
|
||||
`_FileTypeDetector` to provide whatever context they need on the current detection instance.
|
||||
|
||||
`_FileTypeDetector` delegates to _differentiator_ objects like `_ZipFileDifferentiator` for
|
||||
specialized discrimination and/or confirmation of ambiguous or frequently mis-identified
|
||||
MIME-types. Additional differentiators are planned, one for `application/x-ole-storage`
|
||||
(DOC, PPT, XLS, and MSG file-types) and perhaps others.
|
||||
"""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
import contextlib
|
||||
import functools
|
||||
import importlib.util
|
||||
import json
|
||||
import os
|
||||
import re
|
||||
import zipfile
|
||||
from typing import IO, Callable, List, Optional
|
||||
from typing import IO, Callable, Iterator, Optional
|
||||
|
||||
import filetype as ft
|
||||
from typing_extensions import ParamSpec
|
||||
|
||||
from unstructured.documents.elements import Element
|
||||
from unstructured.file_utils.encoding import detect_file_encoding, format_encoding_str
|
||||
from unstructured.file_utils.model import PLAIN_TEXT_EXTENSIONS, FileType
|
||||
from unstructured.file_utils.model import FileType
|
||||
from unstructured.logger import logger
|
||||
from unstructured.nlp.patterns import EMAIL_HEAD_RE, LIST_OF_DICTS_PATTERN
|
||||
from unstructured.partition.common import (
|
||||
@ -21,179 +51,53 @@ from unstructured.partition.common import (
|
||||
remove_element_metadata,
|
||||
set_element_hierarchy,
|
||||
)
|
||||
from unstructured.utils import get_call_args_applying_defaults
|
||||
from unstructured.utils import get_call_args_applying_defaults, lazyproperty
|
||||
|
||||
LIBMAGIC_AVAILABLE = bool(importlib.util.find_spec("magic"))
|
||||
|
||||
|
||||
def detect_filetype(
|
||||
filename: Optional[str] = None,
|
||||
content_type: Optional[str] = None,
|
||||
file: Optional[IO[bytes]] = None,
|
||||
file_filename: Optional[str] = None,
|
||||
encoding: Optional[str] = "utf-8",
|
||||
file_path: str | None = None,
|
||||
file: IO[bytes] | None = None,
|
||||
encoding: str | None = None,
|
||||
content_type: str | None = None,
|
||||
metadata_file_path: Optional[str] = None,
|
||||
) -> FileType:
|
||||
"""Use libmagic to determine a file's type.
|
||||
"""Determine file-type of specified file using libmagic and/or fallback methods.
|
||||
|
||||
Helps determine which partition brick to use for a given file. A return value of None indicates
|
||||
a non-supported file type.
|
||||
One of `file_path` or `file` must be specified. A `file_path` that does not
|
||||
correspond to a file on the filesystem raises `ValueError`.
|
||||
|
||||
Args:
|
||||
content_type: MIME-type of document-source, when already known. Providing
|
||||
a value for this argument disables auto-detection unless it does not map
|
||||
to a FileType member or is ambiguous, in which case it is ignored.
|
||||
encoding: Only used for textual file-types. When omitted, `utf-8` is
|
||||
assumed. Should generally be omitted except to resolve a problem with
|
||||
textual file-types like HTML.
|
||||
metadata_file_path: Only used when `file` is provided and then only as a
|
||||
source for a filename-extension that may be needed as a secondary
|
||||
content-type indicator. Ignored with the document is specified using
|
||||
`file_path`.
|
||||
|
||||
Returns:
|
||||
A member of the `FileType` enumeration, `FileType.UNK` when the file type
|
||||
could not be determined or is not supported.
|
||||
|
||||
Raises:
|
||||
ValueError: when:
|
||||
- `file_path` is specified but does not correspond to a file on the
|
||||
fileesystem.
|
||||
- Neither `file_path` nor `file` were specified.
|
||||
"""
|
||||
mime_type = None
|
||||
exactly_one(filename=filename, file=file)
|
||||
|
||||
# first check (content_type)
|
||||
if content_type:
|
||||
file_type = FileType.from_mime_type(content_type)
|
||||
if file_type:
|
||||
return file_type
|
||||
|
||||
# second check (filename/file_name/file)
|
||||
# continue if successfully define mime_type
|
||||
if filename or file_filename:
|
||||
_filename = filename or file_filename or ""
|
||||
_, extension = os.path.splitext(_filename)
|
||||
extension = extension.lower()
|
||||
if os.path.isfile(_filename) and LIBMAGIC_AVAILABLE:
|
||||
import magic
|
||||
|
||||
mime_type = magic.from_file(_resolve_symlink(_filename), mime=True)
|
||||
elif os.path.isfile(_filename):
|
||||
import filetype as ft
|
||||
|
||||
mime_type = ft.guess_mime(_filename)
|
||||
if mime_type is None:
|
||||
return FileType.from_extension(extension) or FileType.UNK
|
||||
|
||||
elif file is not None:
|
||||
if hasattr(file, "name"):
|
||||
_, extension = os.path.splitext(file.name)
|
||||
else:
|
||||
extension = ""
|
||||
extension = extension.lower()
|
||||
# NOTE(robinson) - the python-magic docs recommend reading at least the first 2048 bytes
|
||||
# Increased to 4096 because otherwise .xlsx files get detected as a zip file
|
||||
# ref: https://github.com/ahupp/python-magic#usage
|
||||
if LIBMAGIC_AVAILABLE:
|
||||
import magic
|
||||
|
||||
mime_type = magic.from_buffer(file.read(4096), mime=True)
|
||||
else:
|
||||
import filetype as ft
|
||||
|
||||
mime_type = ft.guess_mime(file.read(4096))
|
||||
if mime_type is None:
|
||||
logger.warning(
|
||||
"libmagic is unavailable but assists in filetype detection on file-like objects. "
|
||||
"Please consider installing libmagic for better results.",
|
||||
)
|
||||
return FileType.from_extension(extension) or FileType.UNK
|
||||
|
||||
else:
|
||||
raise ValueError("No filename, file, nor file_filename were specified.")
|
||||
|
||||
"""Mime type special cases."""
|
||||
# third check (mime_type)
|
||||
|
||||
# NOTE(Crag): older magic lib does not differentiate between xls and doc
|
||||
if mime_type == "application/msword" and extension == ".xls":
|
||||
return FileType.XLS
|
||||
|
||||
elif mime_type.endswith("xml"):
|
||||
if extension == ".html" or extension == ".htm":
|
||||
return FileType.HTML
|
||||
else:
|
||||
return FileType.XML
|
||||
|
||||
# -- ref: https://www.rfc-editor.org/rfc/rfc822 --
|
||||
elif mime_type == "message/rfc822" or mime_type.startswith("text"):
|
||||
if not encoding:
|
||||
encoding = "utf-8"
|
||||
formatted_encoding = format_encoding_str(encoding)
|
||||
|
||||
if extension in [
|
||||
".eml",
|
||||
".p7s",
|
||||
".md",
|
||||
".rtf",
|
||||
".html",
|
||||
".rst",
|
||||
".org",
|
||||
".csv",
|
||||
".tsv",
|
||||
".json",
|
||||
]:
|
||||
return FileType.from_extension(extension) or FileType.TXT
|
||||
|
||||
# NOTE(crag): for older versions of the OS libmagic package, such as is currently
|
||||
# installed on the Unstructured docker image, .json files resolve to "text/plain"
|
||||
# rather than "application/json". this corrects for that case.
|
||||
if _is_text_file_a_json(
|
||||
file=file,
|
||||
filename=filename,
|
||||
encoding=formatted_encoding,
|
||||
):
|
||||
return FileType.JSON
|
||||
|
||||
if _is_text_file_a_csv(
|
||||
file=file,
|
||||
filename=filename,
|
||||
encoding=formatted_encoding,
|
||||
):
|
||||
return FileType.CSV
|
||||
|
||||
if file and _check_eml_from_buffer(file=file) is True:
|
||||
return FileType.EML
|
||||
|
||||
if extension in PLAIN_TEXT_EXTENSIONS:
|
||||
return FileType.from_extension(extension) or FileType.UNK
|
||||
|
||||
# Safety catch
|
||||
if file_type := FileType.from_mime_type(mime_type):
|
||||
return file_type
|
||||
|
||||
return FileType.TXT
|
||||
|
||||
elif mime_type == "application/octet-stream":
|
||||
if extension == ".docx":
|
||||
return FileType.DOCX
|
||||
elif file:
|
||||
return _detect_filetype_from_octet_stream(file=file)
|
||||
else:
|
||||
return FileType.from_extension(extension) or FileType.UNK
|
||||
|
||||
elif mime_type == "application/zip":
|
||||
file_type = FileType.UNK
|
||||
if file:
|
||||
file_type = _detect_filetype_from_octet_stream(file=file)
|
||||
elif filename is not None:
|
||||
with open(filename, "rb") as f:
|
||||
file_type = _detect_filetype_from_octet_stream(file=f)
|
||||
|
||||
extension = extension if extension else ""
|
||||
return (
|
||||
FileType.ZIP
|
||||
if file_type in (FileType.UNK, FileType.ZIP)
|
||||
else FileType.from_extension(extension) or file_type
|
||||
)
|
||||
|
||||
elif _is_code_mime_type(mime_type):
|
||||
# NOTE(robinson) - we'll treat all code files as plain text for now.
|
||||
# we can update this logic and add filetypes for specific languages
|
||||
# later if needed.
|
||||
return FileType.TXT
|
||||
|
||||
elif mime_type.endswith("empty"):
|
||||
return FileType.EMPTY
|
||||
|
||||
# For everything else
|
||||
elif file_type := FileType.from_mime_type(mime_type):
|
||||
return file_type
|
||||
|
||||
logger.warning(
|
||||
f"The MIME type{f' of {filename!r}' if filename else ''} is {mime_type!r}. "
|
||||
"This file type is not currently supported in unstructured.",
|
||||
ctx = _FileTypeDetectionContext.new(
|
||||
file_path=file_path,
|
||||
file=file,
|
||||
encoding=encoding,
|
||||
content_type=content_type,
|
||||
metadata_file_path=metadata_file_path,
|
||||
)
|
||||
return FileType.from_extension(extension) or FileType.UNK
|
||||
return _FileTypeDetector.file_type(ctx)
|
||||
|
||||
|
||||
def is_json_processable(
|
||||
@ -217,124 +121,476 @@ def is_json_processable(
|
||||
return re.match(LIST_OF_DICTS_PATTERN, file_text) is not None
|
||||
|
||||
|
||||
def _check_eml_from_buffer(file: IO[bytes] | IO[str]) -> bool:
|
||||
"""Checks if a text/plain file is actually a .eml file.
|
||||
class _FileTypeDetector:
|
||||
"""Determines file type from a variety of possible inputs."""
|
||||
|
||||
Uses a regex pattern to see if the start of the file matches the typical pattern for a .eml
|
||||
file.
|
||||
def __init__(self, ctx: _FileTypeDetectionContext):
|
||||
self._ctx = ctx
|
||||
|
||||
@classmethod
|
||||
def file_type(cls, ctx: _FileTypeDetectionContext) -> FileType:
|
||||
"""Detect file-type of document-source described by `ctx`."""
|
||||
return cls(ctx)._file_type
|
||||
|
||||
@property
|
||||
def _file_type(self) -> FileType:
|
||||
"""FileType member corresponding to this document source."""
|
||||
# -- strategy 1: use content-type asserted by caller --
|
||||
if file_type := self._file_type_from_content_type:
|
||||
return file_type
|
||||
|
||||
# -- strategy 2: guess MIME-type using libmagic and use that --
|
||||
if file_type := self._file_type_from_guessed_mime_type:
|
||||
return file_type
|
||||
|
||||
# -- strategy 3: use filename-extension, like ".docx" -> FileType.DOCX --
|
||||
if file_type := self._file_type_from_file_extension:
|
||||
return file_type
|
||||
|
||||
# -- strategy 4: give up and report FileType.UNK --
|
||||
return FileType.UNK
|
||||
|
||||
# == STRATEGIES ============================================================
|
||||
|
||||
@property
|
||||
def _file_type_from_content_type(self) -> FileType | None:
|
||||
"""Map passed content-type argument to a file-type, subject to certain rules."""
|
||||
content_type = self._ctx.content_type
|
||||
|
||||
# -- when no content-type was asserted by caller, this strategy is not applicable --
|
||||
if not content_type:
|
||||
return None
|
||||
|
||||
# -- otherwise we trust the passed `content_type` as long as `FileType` recognizes it --
|
||||
return FileType.from_mime_type(content_type)
|
||||
|
||||
@property
|
||||
def _file_type_from_guessed_mime_type(self) -> FileType | None:
|
||||
"""FileType based on auto-detection of MIME-type by libmagic.
|
||||
|
||||
In some cases refinements are necessary on the magic-derived MIME-types. This process
|
||||
includes applying those rules, most of which are accumulated through practical experience.
|
||||
"""
|
||||
mime_type = self._ctx.mime_type
|
||||
extension = self._ctx.extension
|
||||
|
||||
# -- when libmagic is not installed, the `filetype` package is used instead.
|
||||
# -- `filetype.guess()` returns `None` for file-types it does not support, which
|
||||
# -- unfortunately includes all the textual file-types like CSV, EML, HTML, MD, RST, RTF,
|
||||
# -- TSV, and TXT. When we have no guessed MIME-type, this strategy is not applicable.
|
||||
if mime_type is None:
|
||||
return None
|
||||
|
||||
# NOTE(Crag): older magic lib does not differentiate between xls and doc
|
||||
if mime_type == "application/msword" and extension == ".xls":
|
||||
return FileType.XLS
|
||||
|
||||
if mime_type.endswith("xml"):
|
||||
return FileType.HTML if extension in (".html", ".htm") else FileType.XML
|
||||
|
||||
if differentiator := _TextFileDifferentiator.applies(self._ctx):
|
||||
return differentiator.file_type
|
||||
|
||||
# -- applicable to "application/octet-stream", "application/zip", and all Office 2007+
|
||||
# -- document MIME-types, i.e. those for DOCX, PPTX, and XLSX. Note however it does NOT
|
||||
# -- apply to EPUB or ODT documents, even though those are also Zip archives. The zip and
|
||||
# -- octet-stream MIME-types are fed in because they are ambiguous. The MS-Office types are
|
||||
# -- differentiated because they are sometimes mistaken for each other, like DOCX mime-type
|
||||
# -- is actually a PPTX file etc.
|
||||
if differentiator := _ZipFileDifferentiator.applies(self._ctx, mime_type):
|
||||
return differentiator.file_type
|
||||
|
||||
# -- All source-code files (e.g. *.py, *.js) are classified as plain text for the moment --
|
||||
if self._ctx.has_code_mime_type:
|
||||
return FileType.TXT
|
||||
|
||||
if mime_type.endswith("empty"):
|
||||
return FileType.EMPTY
|
||||
|
||||
# -- if no more-specific rules apply, use the MIME-type -> FileType mapping when present --
|
||||
if file_type := FileType.from_mime_type(mime_type):
|
||||
return file_type
|
||||
|
||||
logger.warning(
|
||||
f"The MIME type{f' of {self._ctx.file_path!r}' if self._ctx.file_path else ''} is"
|
||||
f" {mime_type!r}. This file type is not currently supported in unstructured.",
|
||||
)
|
||||
return None
|
||||
|
||||
@lazyproperty
|
||||
def _file_type_from_file_extension(self) -> FileType | None:
|
||||
"""Determine file-type from filename extension.
|
||||
|
||||
Returns `None` when no filename is available or when the extension does not map to a
|
||||
supported file-type.
|
||||
"""
|
||||
return FileType.from_extension(self._ctx.extension)
|
||||
|
||||
|
||||
class _FileTypeDetectionContext:
|
||||
"""Provides all arguments to auto-file detection and values derived from them.
|
||||
|
||||
This keeps computation of derived values out of the file-detection code but more importantly
|
||||
allows the main filetype-detector to pass the full context to any delegates without coupling
|
||||
itself to which values it might need.
|
||||
"""
|
||||
file.seek(0)
|
||||
file_content = file.read(4096)
|
||||
if isinstance(file_content, bytes):
|
||||
file_head = file_content.decode("utf-8", errors="ignore")
|
||||
else:
|
||||
file_head = file_content
|
||||
return EMAIL_HEAD_RE.match(file_head) is not None
|
||||
|
||||
def __init__(
|
||||
self,
|
||||
file_path: str | None = None,
|
||||
*,
|
||||
file: IO[bytes] | None = None,
|
||||
encoding: str | None = None,
|
||||
content_type: str | None = None,
|
||||
metadata_file_path: str | None = None,
|
||||
):
|
||||
self._file_path = file_path
|
||||
self._file_arg = file
|
||||
self._encoding_arg = encoding
|
||||
self._content_type = content_type
|
||||
self._metadata_file_path = metadata_file_path
|
||||
|
||||
@classmethod
|
||||
def new(
|
||||
cls,
|
||||
*,
|
||||
file_path: str | None,
|
||||
file: IO[bytes] | None,
|
||||
encoding: str | None,
|
||||
content_type: str | None,
|
||||
metadata_file_path: str | None,
|
||||
):
|
||||
self = cls(
|
||||
file_path=file_path,
|
||||
file=file,
|
||||
encoding=encoding,
|
||||
content_type=content_type,
|
||||
metadata_file_path=metadata_file_path,
|
||||
)
|
||||
self._validate()
|
||||
return self
|
||||
|
||||
@lazyproperty
|
||||
def content_type(self) -> str | None:
|
||||
"""MIME-type asserted by caller; not based on inspection of file by this process.
|
||||
|
||||
Would commonly occur when the file was downloaded via HTTP and a `"Content-Type:` header was
|
||||
present on the response. These are often ambiguous and sometimes just wrong so get some
|
||||
further verification. All lower-case when not `None`.
|
||||
"""
|
||||
return self._content_type.lower() if self._content_type else None
|
||||
|
||||
@lazyproperty
|
||||
def encoding(self) -> str:
|
||||
"""Character-set used to encode text of this file.
|
||||
|
||||
Relevant for textual file-types only, like HTML, TXT, JSON, etc.
|
||||
"""
|
||||
return format_encoding_str(self._encoding_arg or "utf-8")
|
||||
|
||||
@lazyproperty
|
||||
def extension(self) -> str:
|
||||
"""Best filename-extension we can muster, "" when there is no available source."""
|
||||
# -- get from file_path, or file when it has a name (path) --
|
||||
with self.open() as file:
|
||||
if hasattr(file, "name") and file.name:
|
||||
return os.path.splitext(file.name)[1].lower()
|
||||
|
||||
# -- otherwise use metadata file-path when provided --
|
||||
if file_path := self._metadata_file_path:
|
||||
return os.path.splitext(file_path)[1].lower()
|
||||
|
||||
# -- otherwise empty str means no extension, same as a path like "a/b/name-no-ext" --
|
||||
return ""
|
||||
|
||||
@lazyproperty
|
||||
def file_head(self) -> bytes:
|
||||
"""The initial bytes of the file to be recognized, for use with libmagic detection."""
|
||||
with self.open() as file:
|
||||
return file.read(4096)
|
||||
|
||||
@lazyproperty
|
||||
def file_path(self) -> str | None:
|
||||
"""Filesystem path to file to be inspected, when provided on call.
|
||||
|
||||
None when the caller specified the source as a file-like object instead. Useful for user
|
||||
feedback on an error, but users of context should have little use for it otherwise.
|
||||
"""
|
||||
return self._file_path
|
||||
|
||||
@lazyproperty
|
||||
def is_zipfile(self) -> bool:
|
||||
"""True when file is a Zip archive."""
|
||||
with self.open() as file:
|
||||
return zipfile.is_zipfile(file)
|
||||
|
||||
@lazyproperty
|
||||
def has_code_mime_type(self) -> bool:
|
||||
"""True when `mime_type` plausibly indicates a programming language source-code file."""
|
||||
mime_type = self.mime_type
|
||||
|
||||
if mime_type is None:
|
||||
return False
|
||||
|
||||
# -- check Go separately to avoid matching other MIME type containing "go" --
|
||||
if mime_type == "text/x-go":
|
||||
return True
|
||||
|
||||
return any(
|
||||
lang in mime_type
|
||||
for lang in "c# c++ cpp csharp java javascript php python ruby swift typescript".split()
|
||||
)
|
||||
|
||||
@lazyproperty
|
||||
def mime_type(self) -> str | None:
|
||||
"""The best MIME-type we can get from `magic` (or `filetype` package).
|
||||
|
||||
A `str` return value is always in lower-case.
|
||||
"""
|
||||
if LIBMAGIC_AVAILABLE:
|
||||
import magic
|
||||
|
||||
mime_type = (
|
||||
magic.from_file(_resolve_symlink(self._file_path), mime=True)
|
||||
if self._file_path
|
||||
else magic.from_buffer(self.file_head, mime=True)
|
||||
)
|
||||
return mime_type.lower() if mime_type else None
|
||||
|
||||
mime_type = (
|
||||
ft.guess_mime(self._file_path) if self._file_path else ft.guess_mime(self.file_head)
|
||||
)
|
||||
|
||||
if mime_type is None:
|
||||
logger.warning(
|
||||
"libmagic is unavailable but assists in filetype detection. Please consider"
|
||||
" installing libmagic for better results."
|
||||
)
|
||||
return None
|
||||
|
||||
return mime_type.lower()
|
||||
|
||||
@contextlib.contextmanager
|
||||
def open(self) -> Iterator[IO[bytes]]:
|
||||
"""Encapsulates complexity of dealing with file-path or file-like-object.
|
||||
|
||||
Provides an `IO[bytes]` object as the "common-denominator" document source.
|
||||
|
||||
Must be used as a context manager using a `with` statement:
|
||||
|
||||
with self._file as file:
|
||||
do things with file
|
||||
|
||||
File is guaranteed to be at read position 0 when called.
|
||||
"""
|
||||
if self._file_path:
|
||||
with open(self._file_path, "rb") as f:
|
||||
yield f
|
||||
else:
|
||||
file = self._file_arg
|
||||
assert file is not None # -- guaranteed by `._validate()` --
|
||||
file.seek(0)
|
||||
yield file
|
||||
|
||||
@lazyproperty
|
||||
def text_head(self) -> str:
|
||||
"""The initial characters of the text file for use with text-format differentiation.
|
||||
|
||||
Raises:
|
||||
UnicodeDecodeError if file cannot be read as text.
|
||||
"""
|
||||
# TODO: only attempts fallback character-set detection for file-path case, not for
|
||||
# file-like object case. Seems like we should do both.
|
||||
|
||||
if file := self._file_arg:
|
||||
file.seek(0)
|
||||
content = file.read(4096)
|
||||
file.seek(0)
|
||||
return (
|
||||
content
|
||||
if isinstance(content, str)
|
||||
else content.decode(encoding=self.encoding, errors="ignore")
|
||||
)
|
||||
|
||||
file_path = self._file_path
|
||||
assert file_path is not None # -- guaranteed by `._validate` --
|
||||
|
||||
try:
|
||||
with open(file_path, encoding=self.encoding) as f:
|
||||
return f.read(4096)
|
||||
except UnicodeDecodeError:
|
||||
encoding, _ = detect_file_encoding(filename=file_path)
|
||||
with open(file_path, encoding=encoding) as f:
|
||||
return f.read(4096)
|
||||
|
||||
def _validate(self) -> None:
|
||||
"""Raise if the context is invalid."""
|
||||
if self._file_path and not os.path.isfile(self._file_path):
|
||||
raise FileNotFoundError(f"no such file {self._file_path}")
|
||||
if not self._file_path and not self._file_arg:
|
||||
raise ValueError("either `file_path` or `file` argument must be provided")
|
||||
|
||||
|
||||
def _detect_filetype_from_octet_stream(file: IO[bytes]) -> FileType:
|
||||
"""Detects the filetype, given a file with an application/octet-stream MIME type."""
|
||||
file.seek(0)
|
||||
if zipfile.is_zipfile(file):
|
||||
file.seek(0)
|
||||
archive = zipfile.ZipFile(file)
|
||||
class _TextFileDifferentiator:
|
||||
"""Refine a textual file-type that may not be as specific as it could be."""
|
||||
|
||||
# NOTE(robinson) - .docx.xlsx files are actually zip file with a .docx/.xslx extension.
|
||||
# If the MIME type is application/octet-stream, we check if it's a .docx/.xlsx file by
|
||||
# looking for expected filenames within the zip file.
|
||||
archive_filenames = [f.filename for f in archive.filelist]
|
||||
if all(f in archive_filenames for f in ("docProps/core.xml", "word/document.xml")):
|
||||
return FileType.DOCX
|
||||
elif all(f in archive_filenames for f in ("xl/workbook.xml",)):
|
||||
return FileType.XLSX
|
||||
elif all(f in archive_filenames for f in ("docProps/core.xml", "ppt/presentation.xml")):
|
||||
return FileType.PPTX
|
||||
def __init__(self, ctx: _FileTypeDetectionContext):
|
||||
self._ctx = ctx
|
||||
|
||||
if LIBMAGIC_AVAILABLE:
|
||||
import magic
|
||||
@classmethod
|
||||
def applies(cls, ctx: _FileTypeDetectionContext) -> _TextFileDifferentiator | None:
|
||||
"""Constructs an instance, but only if this differentiator applies in `ctx`."""
|
||||
mime_type = ctx.mime_type
|
||||
return (
|
||||
cls(ctx)
|
||||
if mime_type and (mime_type == "message/rfc822" or mime_type.startswith("text"))
|
||||
else None
|
||||
)
|
||||
|
||||
# Infer mime type using magic if octet-stream is not zip file
|
||||
mime_type = magic.from_buffer(file.read(4096), mime=True)
|
||||
return FileType.from_mime_type(mime_type) or FileType.UNK
|
||||
logger.warning(
|
||||
"Could not detect the filetype from application/octet-stream MIME type.",
|
||||
)
|
||||
return FileType.UNK
|
||||
@lazyproperty
|
||||
def file_type(self) -> FileType:
|
||||
"""Differentiated file-type for textual content.
|
||||
|
||||
Always produces a file-type, worst case that's `FileType.TXT` when nothing more specific
|
||||
applies.
|
||||
"""
|
||||
extension = self._ctx.extension
|
||||
|
||||
if extension in ".csv .eml .html .json .md .org .p7s .rst .rtf .tab .tsv".split():
|
||||
return FileType.from_extension(extension) or FileType.TXT
|
||||
|
||||
# NOTE(crag): for older versions of the OS libmagic package, such as is currently
|
||||
# installed on the Unstructured docker image, .json files resolve to "text/plain"
|
||||
# rather than "application/json". this corrects for that case.
|
||||
if self._is_json:
|
||||
return FileType.JSON
|
||||
|
||||
if self._is_csv:
|
||||
return FileType.CSV
|
||||
|
||||
if self._is_eml:
|
||||
return FileType.EML
|
||||
|
||||
if extension in (".text", ".txt"):
|
||||
return FileType.TXT
|
||||
|
||||
# Safety catch
|
||||
if file_type := FileType.from_mime_type(self._ctx.mime_type):
|
||||
return file_type
|
||||
|
||||
return FileType.TXT
|
||||
|
||||
@lazyproperty
|
||||
def _is_csv(self) -> bool:
|
||||
"""True when file is plausibly in Comma Separated Values (CSV) format."""
|
||||
|
||||
def count_commas(text: str):
|
||||
"""Counts the number of commas in a line, excluding commas in quotes."""
|
||||
pattern = r"(?=(?:[^\"]*\"[^\"]*\")*[^\"]*$),"
|
||||
matches = re.findall(pattern, text)
|
||||
return len(matches)
|
||||
|
||||
lines = self._ctx.text_head.strip().splitlines()
|
||||
if len(lines) < 2:
|
||||
return False
|
||||
# -- check at most the first 10 lines --
|
||||
lines = lines[: len(lines)] if len(lines) < 10 else lines[:10]
|
||||
# -- any lines without at least one comma disqualifies the file --
|
||||
if any("," not in line for line in lines):
|
||||
return False
|
||||
header_count = count_commas(lines[0])
|
||||
return all(count_commas(line) == header_count for line in lines[1:])
|
||||
|
||||
@lazyproperty
|
||||
def _is_eml(self) -> bool:
|
||||
"""Checks if a text/plain file is actually a .eml file.
|
||||
|
||||
Uses a regex pattern to see if the start of the file matches the typical pattern for a .eml
|
||||
file.
|
||||
"""
|
||||
return EMAIL_HEAD_RE.match(self._ctx.text_head) is not None
|
||||
|
||||
@lazyproperty
|
||||
def _is_json(self) -> bool:
|
||||
"""True when file is JSON collection.
|
||||
|
||||
A JSON file that contains only a string, number, or boolean, while valid JSON, will fail
|
||||
this test since it is not partitionable.
|
||||
"""
|
||||
text_head = self._ctx.text_head
|
||||
|
||||
# -- an empty file is not JSON --
|
||||
if not text_head:
|
||||
return False
|
||||
|
||||
# -- has to be a list or object, no string, number, or bool --
|
||||
if text_head.lstrip()[0] not in "[{":
|
||||
return False
|
||||
|
||||
try:
|
||||
with self._ctx.open() as file:
|
||||
json.load(file)
|
||||
return True
|
||||
except json.JSONDecodeError:
|
||||
return False
|
||||
|
||||
|
||||
def _is_code_mime_type(mime_type: str) -> bool:
|
||||
"""True when `mime_type` plausibly indicates a programming language source-code file."""
|
||||
PROGRAMMING_LANGUAGES = [
|
||||
"javascript",
|
||||
"python",
|
||||
"java",
|
||||
"c++",
|
||||
"cpp",
|
||||
"csharp",
|
||||
"c#",
|
||||
"php",
|
||||
"ruby",
|
||||
"swift",
|
||||
"typescript",
|
||||
]
|
||||
mime_type = mime_type.lower()
|
||||
# NOTE(robinson) - check this one explicitly to avoid conflicts with other
|
||||
# MIME types that contain "go"
|
||||
if mime_type == "text/x-go":
|
||||
return True
|
||||
return any(language in mime_type for language in PROGRAMMING_LANGUAGES)
|
||||
class _ZipFileDifferentiator:
|
||||
"""Refine a Zip-packaged file-type that may be ambiguous or swapped."""
|
||||
|
||||
def __init__(self, ctx: _FileTypeDetectionContext):
|
||||
self._ctx = ctx
|
||||
|
||||
def _is_text_file_a_csv(
|
||||
filename: Optional[str] = None,
|
||||
file: Optional[IO[bytes]] = None,
|
||||
encoding: Optional[str] = "utf-8",
|
||||
):
|
||||
"""Detects if a file that has a text/plain MIME type is a CSV file."""
|
||||
@classmethod
|
||||
def applies(
|
||||
cls, ctx: _FileTypeDetectionContext, mime_type: str
|
||||
) -> _ZipFileDifferentiator | None:
|
||||
"""Constructs an instance, but only if this differentiator applies for `mime_type`.
|
||||
|
||||
def count_commas(text: str):
|
||||
"""Counts the number of commas in a line, excluding commas in quotes."""
|
||||
pattern = r"(?=(?:[^\"]*\"[^\"]*\")*[^\"]*$),"
|
||||
matches = re.findall(pattern, text)
|
||||
return len(matches)
|
||||
Separate `mime_type` argument allows it to be applied to either asserted content-type or
|
||||
guessed mime-type.
|
||||
"""
|
||||
return (
|
||||
cls(ctx)
|
||||
if mime_type
|
||||
in (
|
||||
"application/octet-stream",
|
||||
"application/zip",
|
||||
"application/vnd.openxmlformats-officedocument.wordprocessingml.document",
|
||||
"application/vnd.openxmlformats-officedocument.presentationml.presentation",
|
||||
"application/vnd.openxmlformats-officedocument.spreadsheetml.sheet",
|
||||
)
|
||||
else None
|
||||
)
|
||||
|
||||
file_text = _read_file_start_for_type_check(
|
||||
file=file,
|
||||
filename=filename,
|
||||
encoding=encoding,
|
||||
)
|
||||
lines = file_text.strip().splitlines()
|
||||
if len(lines) < 2:
|
||||
return False
|
||||
lines = lines[: len(lines)] if len(lines) < 10 else lines[:10]
|
||||
header_count = count_commas(lines[0])
|
||||
if any("," not in line for line in lines):
|
||||
return False
|
||||
return all(count_commas(line) == header_count for line in lines[1:])
|
||||
@lazyproperty
|
||||
def file_type(self) -> FileType | None:
|
||||
"""Differentiated file-type for a Zip archive.
|
||||
|
||||
Returns `None` if the file is not a Zip archive. Otherwise it returns `FileType.DOCX`,
|
||||
`FileType.PPTX`, or `FileType.XLSX` when one of those applies and `FileType.ZIP` otherwise.
|
||||
"""
|
||||
if not self._ctx.is_zipfile:
|
||||
return None
|
||||
|
||||
def _is_text_file_a_json(
|
||||
filename: Optional[str] = None,
|
||||
file: Optional[IO[bytes]] = None,
|
||||
encoding: Optional[str] = "utf-8",
|
||||
):
|
||||
"""Detects if a file that has a text/plain MIME type is a JSON file."""
|
||||
file_text = _read_file_start_for_type_check(
|
||||
file=file,
|
||||
filename=filename,
|
||||
encoding=encoding,
|
||||
)
|
||||
try:
|
||||
output = json.loads(file_text)
|
||||
# NOTE(robinson) - Per RFC 4627 which defines the application/json media type,
|
||||
# a string is a valid JSON. For our purposes, however, we want to treat that
|
||||
# as a text file even if it is serializable as json.
|
||||
# References:
|
||||
# https://stackoverflow.com/questions/7487869/is-this-simple-string-considered-valid-json
|
||||
# https://www.ietf.org/rfc/rfc4627.txt
|
||||
return not isinstance(output, str)
|
||||
except json.JSONDecodeError:
|
||||
return False
|
||||
with self._ctx.open() as file:
|
||||
zip = zipfile.ZipFile(file)
|
||||
|
||||
# NOTE(robinson) - .docx and .xlsx files are actually a zip file with a .docx/.xslx
|
||||
# extension. If the MIME type is application/octet-stream, we check if it's a
|
||||
# .docx/.xlsx file by looking for expected filenames within the zip file.
|
||||
filenames = [f.filename for f in zip.filelist]
|
||||
|
||||
if all(f in filenames for f in ("word/document.xml",)):
|
||||
return FileType.DOCX
|
||||
|
||||
if all(f in filenames for f in ("xl/workbook.xml",)):
|
||||
return FileType.XLSX
|
||||
|
||||
if all(f in filenames for f in ("ppt/presentation.xml",)):
|
||||
return FileType.PPTX
|
||||
|
||||
return FileType.ZIP
|
||||
|
||||
|
||||
def _read_file_start_for_type_check(
|
||||
@ -379,9 +635,9 @@ def _resolve_symlink(file_path: str) -> str:
|
||||
_P = ParamSpec("_P")
|
||||
|
||||
|
||||
def add_metadata(func: Callable[_P, List[Element]]) -> Callable[_P, List[Element]]:
|
||||
def add_metadata(func: Callable[_P, list[Element]]) -> Callable[_P, list[Element]]:
|
||||
@functools.wraps(func)
|
||||
def wrapper(*args: _P.args, **kwargs: _P.kwargs) -> List[Element]:
|
||||
def wrapper(*args: _P.args, **kwargs: _P.kwargs) -> list[Element]:
|
||||
elements = func(*args, **kwargs)
|
||||
call_args = get_call_args_applying_defaults(func, *args, **kwargs)
|
||||
include_metadata = call_args.get("include_metadata", True)
|
||||
@ -412,7 +668,7 @@ def add_metadata(func: Callable[_P, List[Element]]) -> Callable[_P, List[Element
|
||||
|
||||
def add_filetype(
|
||||
filetype: FileType,
|
||||
) -> Callable[[Callable[_P, List[Element]]], Callable[_P, List[Element]]]:
|
||||
) -> Callable[[Callable[_P, list[Element]]], Callable[_P, list[Element]]]:
|
||||
"""Post-process element-metadata for list[Element] from partitioning.
|
||||
|
||||
This decorator adds a post-processing step to a document partitioner.
|
||||
@ -423,9 +679,9 @@ def add_filetype(
|
||||
|
||||
"""
|
||||
|
||||
def decorator(func: Callable[_P, List[Element]]) -> Callable[_P, List[Element]]:
|
||||
def decorator(func: Callable[_P, list[Element]]) -> Callable[_P, list[Element]]:
|
||||
@functools.wraps(func)
|
||||
def wrapper(*args: _P.args, **kwargs: _P.kwargs) -> List[Element]:
|
||||
def wrapper(*args: _P.args, **kwargs: _P.kwargs) -> list[Element]:
|
||||
elements = func(*args, **kwargs)
|
||||
params = get_call_args_applying_defaults(func, *args, **kwargs)
|
||||
include_metadata = params.get("include_metadata", True)
|
||||
@ -447,10 +703,10 @@ def add_filetype(
|
||||
|
||||
def add_metadata_with_filetype(
|
||||
filetype: FileType,
|
||||
) -> Callable[[Callable[_P, List[Element]]], Callable[_P, List[Element]]]:
|
||||
) -> Callable[[Callable[_P, list[Element]]], Callable[_P, list[Element]]]:
|
||||
"""..."""
|
||||
|
||||
def decorator(func: Callable[_P, List[Element]]) -> Callable[_P, List[Element]]:
|
||||
def decorator(func: Callable[_P, list[Element]]) -> Callable[_P, list[Element]]:
|
||||
return add_filetype(filetype=filetype)(add_metadata(func))
|
||||
|
||||
return decorator
|
||||
|
@ -76,12 +76,14 @@ class FileType(enum.Enum):
|
||||
return None
|
||||
|
||||
@classmethod
|
||||
def from_mime_type(cls, mime_type: str) -> FileType | None:
|
||||
def from_mime_type(cls, mime_type: str | None) -> FileType | None:
|
||||
"""Select a FileType member based on a MIME-type.
|
||||
|
||||
Returns `None` when `mime_type` is `None` or does not map to the canonical MIME-type of a
|
||||
`FileType` member or one of its alias MIME-types.
|
||||
"""
|
||||
if mime_type is None:
|
||||
return None
|
||||
# -- not super efficient but plenty fast enough for once-or-twice-per-file use and avoids
|
||||
# -- limitations on defining a class variable on an Enum.
|
||||
for m in cls.__members__.values():
|
||||
@ -434,6 +436,3 @@ class FileType(enum.Enum):
|
||||
"inode/x-empty",
|
||||
cast(list[str], []),
|
||||
)
|
||||
|
||||
|
||||
PLAIN_TEXT_EXTENSIONS = ".csv .eml .html .json .md .org .p7s .rst .rtf .tab .text .tsv .txt".split()
|
||||
|
@ -1,10 +1,23 @@
|
||||
from __future__ import annotations
|
||||
|
||||
import json
|
||||
from typing import Dict, Optional, Tuple, Union
|
||||
|
||||
from typing_extensions import TypeAlias
|
||||
|
||||
FrequencyDict: TypeAlias = "dict[tuple[str, int | None], int]"
|
||||
"""Like:
|
||||
{
|
||||
("ListItem", 0): 2,
|
||||
("NarrativeText", None): 2,
|
||||
("Title", 0): 5,
|
||||
("UncategorizedText", None): 6,
|
||||
}
|
||||
"""
|
||||
|
||||
|
||||
def get_element_type_frequency(
|
||||
elements: str,
|
||||
) -> Union[Dict[Tuple[str, Optional[int]], int], Dict]:
|
||||
) -> FrequencyDict:
|
||||
"""
|
||||
Calculate the frequency of Element Types from a list of elements.
|
||||
|
||||
@ -13,7 +26,7 @@ def get_element_type_frequency(
|
||||
Returns:
|
||||
Element type and its frequency in dictionary format.
|
||||
"""
|
||||
frequency: Dict = {}
|
||||
frequency: dict[tuple[str, int | None], int] = {}
|
||||
if len(elements) == 0:
|
||||
return frequency
|
||||
for element in json.loads(elements):
|
||||
@ -28,14 +41,14 @@ def get_element_type_frequency(
|
||||
|
||||
|
||||
def calculate_element_type_percent_match(
|
||||
output: Dict,
|
||||
source: Dict,
|
||||
output: FrequencyDict,
|
||||
source: FrequencyDict,
|
||||
category_depth_weight: float = 0.5,
|
||||
) -> float:
|
||||
"""
|
||||
Calculate the percent match between two frequency dictionary. Intended to use with
|
||||
`get_element_type_frequency` function. The function counts the absolute exact match
|
||||
(type and depth), and counts the weighted match (correct type but different depth),
|
||||
"""Calculate the percent match between two frequency dictionary.
|
||||
|
||||
Intended to use with `get_element_type_frequency` function. The function counts the absolute
|
||||
exact match (type and depth), and counts the weighted match (correct type but different depth),
|
||||
then normalized with source's total elements.
|
||||
"""
|
||||
if len(output) == 0 or len(source) == 0:
|
||||
@ -46,8 +59,8 @@ def calculate_element_type_percent_match(
|
||||
total_source_element_count = 0
|
||||
total_match_element_count = 0
|
||||
|
||||
unmatched_depth_output = {}
|
||||
unmatched_depth_source = {}
|
||||
unmatched_depth_output: dict[str, int] = {}
|
||||
unmatched_depth_source: dict[str, int] = {}
|
||||
|
||||
# loop through the output list to find match with source
|
||||
for k, _ in output_copy.items():
|
||||
@ -80,12 +93,12 @@ def calculate_element_type_percent_match(
|
||||
return min(max(total_match_element_count / total_source_element_count, 0.0), 1.0)
|
||||
|
||||
|
||||
def _convert_to_frequency_without_depth(d: Dict) -> Dict:
|
||||
def _convert_to_frequency_without_depth(d: FrequencyDict) -> dict[str, int]:
|
||||
"""
|
||||
Takes in element frequency with depth of format (type, depth): value
|
||||
and converts to dictionary without depth of format type: value
|
||||
"""
|
||||
res = {}
|
||||
res: dict[str, int] = {}
|
||||
for k, v in d.items():
|
||||
element_type = k[0]
|
||||
if element_type not in res:
|
||||
|
@ -184,11 +184,11 @@ def partition(
|
||||
"The headers kwarg will be ignored.",
|
||||
)
|
||||
file_type = detect_filetype(
|
||||
filename=filename,
|
||||
file_path=filename,
|
||||
file=file,
|
||||
file_filename=metadata_filename,
|
||||
content_type=content_type,
|
||||
encoding=encoding,
|
||||
content_type=content_type,
|
||||
metadata_file_path=metadata_filename,
|
||||
)
|
||||
|
||||
if file is not None:
|
||||
@ -471,12 +471,13 @@ def file_and_type_from_url(
|
||||
response = requests.get(url, headers=headers, verify=ssl_verify, timeout=request_timeout)
|
||||
file = io.BytesIO(response.content)
|
||||
|
||||
content_type = (
|
||||
content_type or response.headers.get("Content-Type", "").split(";")[0].strip().lower()
|
||||
)
|
||||
encoding = response.headers.get("Content-Encoding", "utf-8")
|
||||
if content_type := content_type or response.headers.get("Content-Type", None):
|
||||
content_type = content_type.split(";")[0].strip().lower()
|
||||
|
||||
filetype = detect_filetype(file=file, content_type=content_type, encoding=encoding)
|
||||
# -- non-None when response is textual --
|
||||
encoding = response.encoding
|
||||
|
||||
filetype = detect_filetype(file=file, encoding=encoding, content_type=content_type)
|
||||
return file, filetype
|
||||
|
||||
|
||||
|
@ -133,10 +133,12 @@ def elements_to_json(
|
||||
filename: Optional[str] = None,
|
||||
indent: int = 4,
|
||||
encoding: str = "utf-8",
|
||||
) -> Optional[str]:
|
||||
"""Saves a list of elements to a JSON file if filename is specified.
|
||||
) -> str:
|
||||
"""Serialize `elements` to a JSON array.
|
||||
|
||||
Otherwise, return the list of elements as a string.
|
||||
Also writes the JSON to `filename` if it is provided, encoded using `encoding`.
|
||||
|
||||
The JSON is returned as a string.
|
||||
"""
|
||||
# -- serialize `elements` as a JSON array (str) --
|
||||
precision_adjusted_elements = _fix_metadata_field_precision(elements)
|
||||
@ -146,7 +148,6 @@ def elements_to_json(
|
||||
if filename is not None:
|
||||
with open(filename, "w", encoding=encoding) as f:
|
||||
f.write(json_str)
|
||||
return None
|
||||
|
||||
return json_str
|
||||
|
||||
|
Loading…
x
Reference in New Issue
Block a user