From 1b8bf318b8cf815d4101fb673cb22e11f36e569b Mon Sep 17 00:00:00 2001 From: cragwolfe Date: Tue, 21 Feb 2023 17:02:05 -0800 Subject: [PATCH] refactor: move processing logic to IngestDoc (#248) Moves the logic to partition a raw document to the IngestDoc level to allow for easier overrides for subclasses of IngestDoc. --- CHANGELOG.md | 2 +- Ingest.md | 1 + unstructured/ingest/connector/s3_connector.py | 4 ++-- .../ingest/doc_processor/generalized.py | 19 ++----------------- unstructured/ingest/interfaces.py | 19 ++++++++++++++++++- 5 files changed, 24 insertions(+), 21 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 8feaab3e6..67605df56 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,6 +1,6 @@ ## 0.4.12-dev1 -* Adds console_entrypoint for unstructured-ingest and more structure/docs related to ingest. +* Adds console_entrypoint for unstructured-ingest, other structure/doc updates related to ingest. ## 0.4.11 diff --git a/Ingest.md b/Ingest.md index efcc6f013..1067575bf 100644 --- a/Ingest.md +++ b/Ingest.md @@ -55,6 +55,7 @@ The `main.py` flags of --re-download/--no-re-download , --download-dir, --preser In checklist form, the above steps are summarized as: - [ ] Create a new module under [unstructured/ingest/connector/](unstructured/ingest/connector/) implementing the 3 abstract base classes, similar to [unstructured/ingest/connector/s3_connector.py](unstructured/ingest/connector/s3_connector.py). + - [ ] The subclass of `BaseIngestDoc` overrides `process_file()` if extra processing logic is needed other than what is provided by [auto.partition()](unstructured/partition/auto.py). - [ ] Update [unstructured/ingest/main.py](unstructured/ingest/main.py) with support for the new connector. - [ ] Create a folder under [examples/ingest](examples/ingest) that includes at least one well documented script. - [ ] Add a script test_unstructured_ingest/test-ingest-\.sh. It's json output files should have a total of no more than 100K. diff --git a/unstructured/ingest/connector/s3_connector.py b/unstructured/ingest/connector/s3_connector.py index 14828744b..e0b5f17ca 100644 --- a/unstructured/ingest/connector/s3_connector.py +++ b/unstructured/ingest/connector/s3_connector.py @@ -107,12 +107,12 @@ class S3IngestDoc(BaseIngestDoc): print(f"fetching {self} - PID: {os.getpid()}") s3_cli.download_file(self.config.s3_bucket, self.s3_key, self._tmp_download_file()) - def write_result(self, result): + def write_result(self): """Write the structured json result for this doc. result must be json serializable.""" output_filename = self._output_filename() output_filename.parent.mkdir(parents=True, exist_ok=True) with open(output_filename, "w") as output_f: - output_f.write(json.dumps(result, ensure_ascii=False, indent=2)) + output_f.write(json.dumps(self.isd_elems_no_filename, ensure_ascii=False, indent=2)) print(f"Wrote {output_filename}") @property diff --git a/unstructured/ingest/doc_processor/generalized.py b/unstructured/ingest/doc_processor/generalized.py index de2818cc1..9adf13548 100644 --- a/unstructured/ingest/doc_processor/generalized.py +++ b/unstructured/ingest/doc_processor/generalized.py @@ -2,9 +2,6 @@ import logging -from unstructured.partition.auto import partition -from unstructured.staging.base import convert_to_isd - from unstructured_inference.models.detectron2 import MODEL_TYPES @@ -25,24 +22,12 @@ def process_document(doc): # in the future, get_file_handle() could also be supported doc.get_file() - # accessing the .filename property could lazily call .get_file(), but - # keeping them as two distinct calls for end-user transparency for now - print(f"Processing {doc.filename}") - - elements = partition(filename=doc.filename) - - isd_elems = convert_to_isd(elements) - - isd_elems_no_filename = [] - for elem in isd_elems: - # type: ignore - elem["metadata"].pop("filename") # type: ignore[attr-defined] - isd_elems_no_filename.append(elem) + isd_elems_no_filename = doc.process_file() # Note, this may be a no-op if the IngestDoc doesn't do anything to persist # the results. Instead, the MainProcess (caller) may work with the aggregate # results across all docs in memory. - doc.write_result(isd_elems_no_filename) + doc.write_result() except Exception: # TODO(crag) save the exception instead of print? diff --git a/unstructured/ingest/interfaces.py b/unstructured/ingest/interfaces.py index fd4d486dd..fce597feb 100644 --- a/unstructured/ingest/interfaces.py +++ b/unstructured/ingest/interfaces.py @@ -3,6 +3,9 @@ through Unstructured.""" from abc import ABC, abstractmethod +from unstructured.partition.auto import partition +from unstructured.staging.base import convert_to_isd + class BaseConnector(ABC): """Abstract Base Class for a connector to a remote source, e.g. S3 or Google Drive.""" @@ -80,6 +83,20 @@ class BaseIngestDoc(ABC): pass @abstractmethod - def write_result(self, result): + def write_result(self): """Write the structured json result for this doc. result must be json serializable.""" pass + + def process_file(self): + print(f"Processing {self.filename}") + + elements = partition(filename=self.filename) + isd_elems = convert_to_isd(elements) + + self.isd_elems_no_filename = [] + for elem in isd_elems: + # type: ignore + elem["metadata"].pop("filename") # type: ignore[attr-defined] + self.isd_elems_no_filename.append(elem) + + return self.isd_elems_no_filename