chore: DRY ingest connectors (#769)

This commit is contained in:
ryannikolaidis 2023-06-26 13:12:05 -07:00 committed by GitHub
parent 95f02f290d
commit a5c7e5b41e
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
17 changed files with 146 additions and 441 deletions

View File

@ -1,3 +1,13 @@
## 0.7.10-dev0
### Enhancements
* DRY connector refactor
### Features
### Fixes
## 0.7.9
### Enhancements

View File

@ -6,7 +6,7 @@ set -e
SCRIPT_DIR=$( cd -- "$( dirname -- "${BASH_SOURCE[0]}" )" &> /dev/null && pwd )
cd "$SCRIPT_DIR"/.. || exit 1
if [[ "$(find test_unstructured_ingest/expected-structured-output/biomed-ingest-output-api/ -type f -size +10k | wc -l)" != 2 ]]; then
if [[ "$(find test_unstructured_ingest/expected-structured-output/biomed-ingest-output-api/ -type f -size +10k | wc -l)" -ne 2 ]]; then
echo "The test fixtures in test_unstructured_ingest/expected-structured-output/biomed-ingest-output-api/ look suspicious. At least one of the files is too small."
echo "Did you overwrite test fixtures with bad outputs?"
exit 1

View File

@ -6,7 +6,7 @@ set -e
SCRIPT_DIR=$( cd -- "$( dirname -- "${BASH_SOURCE[0]}" )" &> /dev/null && pwd )
cd "$SCRIPT_DIR"/.. || exit 1
if [[ "$(find test_unstructured_ingest/expected-structured-output/biomed-ingest-output-path/ -type f -size +10k | wc -l)" != 1 ]]; then
if [[ "$(find test_unstructured_ingest/expected-structured-output/biomed-ingest-output-path/ -type f -size +10k | wc -l)" -ne 1 ]]; then
echo "The test fixtures in test_unstructured_ingest/expected-structured-output/biomed-ingest-output-path/ look suspicious. At least one of the files is too small."
echo "Did you overwrite test fixtures with bad outputs?"
exit 1

View File

@ -46,6 +46,10 @@ class TestIngestDoc(BaseIngestDoc):
def filename(self):
return TEST_FILE_PATH
@property
def _output_filename(self):
return TEST_FILE_PATH + ".json"
@property
def source_url(self) -> str:
return TEST_SOURCE_URL

View File

@ -1 +1 @@
__version__ = "0.7.9" # pragma: no cover
__version__ = "0.7.10-dev0" # pragma: no cover

View File

@ -1,4 +1,3 @@
import json
import os
import urllib.request
from dataclasses import dataclass
@ -15,6 +14,8 @@ from unstructured.ingest.interfaces import (
BaseConnector,
BaseConnectorConfig,
BaseIngestDoc,
ConnectorCleanupMixin,
IngestDocCleanupMixin,
StandardConnectorConfig,
)
from unstructured.ingest.logger import logger
@ -106,7 +107,7 @@ class SimpleBiomedConfig(BaseConnectorConfig):
@dataclass
class BiomedIngestDoc(BaseIngestDoc):
class BiomedIngestDoc(IngestDocCleanupMixin, BaseIngestDoc):
config: SimpleBiomedConfig
file_meta: BiomedFileMeta
@ -114,6 +115,7 @@ class BiomedIngestDoc(BaseIngestDoc):
def filename(self):
return Path(self.file_meta.download_filepath).resolve() # type: ignore
@property
def _output_filename(self):
return Path(f"{self.file_meta.output_filepath}.json").resolve()
@ -126,42 +128,23 @@ class BiomedIngestDoc(BaseIngestDoc):
logger.debug(f"Cleaning up {self}")
Path.unlink(self.filename)
def has_output(self):
"""Determine if structured output for this doc already exists."""
output_filename = self._output_filename()
return output_filename.is_file() and output_filename.stat()
@BaseIngestDoc.skip_if_file_exists
def get_file(self):
download_path = self.file_meta.download_filepath # type: ignore
dir_ = Path(os.path.dirname(download_path)) # type: ignore
if not dir_.is_dir():
logger.debug(f"Creating directory: {dir_}")
if dir_:
dir_.mkdir(parents=True, exist_ok=True)
urllib.request.urlretrieve(
self.file_meta.ftp_path, # type: ignore
self.file_meta.download_filepath,
)
logger.debug(f"File downloaded: {self.file_meta.download_filepath}")
def write_result(self):
"""Write the structured json result for this doc. result must be json serializable."""
if self.standard_config.download_only:
return
output_filename = self._output_filename()
output_filename.parent.mkdir(parents=True, exist_ok=True)
with open(output_filename, "w") as output_f:
output_f.write(
json.dumps(self.isd_elems_no_filename, ensure_ascii=False, indent=2),
)
logger.info(f"Wrote {output_filename}")
class BiomedConnector(BaseConnector):
class BiomedConnector(ConnectorCleanupMixin, BaseConnector):
"""Objects of this class support fetching documents from Biomedical literature FTP directory"""
config: SimpleBiomedConfig
@ -172,9 +155,6 @@ class BiomedConnector(BaseConnector):
config: SimpleBiomedConfig,
):
super().__init__(standard_config, config)
self.cleanup_files = (
not self.standard_config.preserve_downloads and not self.standard_config.download_only
)
def _list_objects_api(self):
def urls_to_metadata(urls):
@ -300,26 +280,6 @@ class BiomedConnector(BaseConnector):
return files
def cleanup(self, cur_dir=None):
if not self.cleanup_files:
return
if cur_dir is None:
cur_dir = self.standard_config.download_dir
if cur_dir is None or not Path(cur_dir).is_dir():
return
sub_dirs = os.listdir(cur_dir)
os.chdir(cur_dir)
for sub_dir in sub_dirs:
# don't traverse symlinks, not that there every should be any
if os.path.isdir(sub_dir) and not os.path.islink(sub_dir):
self.cleanup(sub_dir)
os.chdir("..")
if len(os.listdir(cur_dir)) == 0:
os.rmdir(cur_dir)
def initialize(self):
pass

View File

@ -1,5 +1,4 @@
import datetime as dt
import json
import os
from dataclasses import dataclass
from pathlib import Path
@ -9,6 +8,8 @@ from unstructured.ingest.interfaces import (
BaseConnector,
BaseConnectorConfig,
BaseIngestDoc,
ConnectorCleanupMixin,
IngestDocCleanupMixin,
StandardConnectorConfig,
)
from unstructured.ingest.logger import logger
@ -45,7 +46,7 @@ class SimpleDiscordConfig(BaseConnectorConfig):
@dataclass
class DiscordIngestDoc(BaseIngestDoc):
class DiscordIngestDoc(IngestDocCleanupMixin, BaseIngestDoc):
"""Class encapsulating fetching a doc and writing processed results (but not
doing the processing!).
Also includes a cleanup method. When things go wrong and the cleanup
@ -64,17 +65,15 @@ class DiscordIngestDoc(BaseIngestDoc):
channel_file = self.channel + ".txt"
return Path(self.standard_config.download_dir) / channel_file
@property
def _output_filename(self):
output_file = self.channel + ".json"
return Path(self.standard_config.output_dir) / output_file
def has_output(self):
"""Determine if structured output for this doc already exists."""
return self._output_filename().is_file() and os.path.getsize(self._output_filename())
def _create_full_tmp_dir_path(self):
self._tmp_download_file().parent.mkdir(parents=True, exist_ok=True)
@BaseIngestDoc.skip_if_file_exists
@requires_dependencies(dependencies=["discord"], extras="discord")
def get_file(self):
"""Actually fetches the data from discord and stores it locally."""
@ -83,20 +82,9 @@ class DiscordIngestDoc(BaseIngestDoc):
from discord.ext import commands
self._create_full_tmp_dir_path()
if (
not self.standard_config.re_download
and self._tmp_download_file().is_file()
and os.path.getsize(self._tmp_download_file())
):
if self.config.verbose:
logger.debug(f"File exists: {self._tmp_download_file()}, skipping download")
return
if self.config.verbose:
logger.debug(f"fetching {self} - PID: {os.getpid()}")
messages: List[discord.Message] = []
intents = discord.Intents.default()
intents.message_content = True
bot = commands.Bot(command_prefix=">", intents=intents)
@ -123,28 +111,13 @@ class DiscordIngestDoc(BaseIngestDoc):
for m in messages:
f.write(m.content + "\n")
def write_result(self):
"""Write the structured json result for this doc. result must be json serializable."""
output_filename = self._output_filename()
output_filename.parent.mkdir(parents=True, exist_ok=True)
with open(output_filename, "w") as output_f:
output_f.write(json.dumps(self.isd_elems_no_filename, ensure_ascii=False, indent=2))
logger.info(f"Wrote {output_filename}")
@property
def filename(self):
"""The filename of the file created from a discord channel"""
return self._tmp_download_file()
def cleanup_file(self):
"""Removes the local copy the file after successful processing."""
if not self.standard_config.preserve_downloads:
if self.config.verbose:
logger.info(f"cleaning up channel {self.channel}")
os.unlink(self._tmp_download_file())
class DiscordConnector(BaseConnector):
class DiscordConnector(ConnectorCleanupMixin, BaseConnector):
"""Objects of this class support fetching document(s) from"""
config: SimpleDiscordConfig
@ -155,25 +128,6 @@ class DiscordConnector(BaseConnector):
config: SimpleDiscordConfig,
):
super().__init__(standard_config, config)
self.cleanup_files = not standard_config.preserve_downloads
def cleanup(self, cur_dir=None):
"""cleanup linginering empty sub-dirs from s3 paths, but leave remaining files
(and their paths) in tact as that indicates they were not processed"""
if not self.cleanup_files:
return
if cur_dir is None:
cur_dir = self.standard_config.download_dir
sub_dirs = os.listdir(cur_dir)
os.chdir(cur_dir)
for sub_dir in sub_dirs:
# don't traverse symlinks, not that there every should be any
if os.path.isdir(sub_dir) and not os.path.islink(sub_dir):
self.cleanup(sub_dir)
os.chdir("..")
if len(os.listdir(cur_dir)) == 0:
os.rmdir(cur_dir)
def initialize(self):
"""Verify that can get metadata for an object, validates connections info."""

View File

@ -1,4 +1,3 @@
import json
import os
import re
from dataclasses import dataclass, field
@ -9,6 +8,8 @@ from unstructured.ingest.interfaces import (
BaseConnector,
BaseConnectorConfig,
BaseIngestDoc,
ConnectorCleanupMixin,
IngestDocCleanupMixin,
StandardConnectorConfig,
)
from unstructured.ingest.logger import logger
@ -60,7 +61,7 @@ class SimpleFsspecConfig(BaseConnectorConfig):
@dataclass
class FsspecIngestDoc(BaseIngestDoc):
class FsspecIngestDoc(IngestDocCleanupMixin, BaseIngestDoc):
"""Class encapsulating fetching a doc and writing processed results (but not
doing the processing!).
@ -77,70 +78,36 @@ class FsspecIngestDoc(BaseIngestDoc):
"",
)
@property
def _output_filename(self):
return (
Path(self.standard_config.output_dir)
/ f"{self.remote_file_path.replace(f'{self.config.dir_path}/', '')}.json"
)
def has_output(self):
"""Determine if structured output for this doc already exists."""
return self._output_filename().is_file() and os.path.getsize(
self._output_filename(),
)
def _create_full_tmp_dir_path(self):
"""Includes "directories" in the object path"""
self._tmp_download_file().parent.mkdir(parents=True, exist_ok=True)
@BaseIngestDoc.skip_if_file_exists
def get_file(self):
"""Fetches the file from the current filesystem and stores it locally."""
from fsspec import AbstractFileSystem, get_filesystem_class
self._create_full_tmp_dir_path()
if (
not self.standard_config.re_download
and self._tmp_download_file().is_file()
and os.path.getsize(self._tmp_download_file())
):
logger.debug(f"File exists: {self._tmp_download_file()}, skipping download")
return
fs: AbstractFileSystem = get_filesystem_class(self.config.protocol)(
**self.config.access_kwargs,
)
logger.debug(f"Fetching {self} - PID: {os.getpid()}")
fs.get(rpath=self.remote_file_path, lpath=self._tmp_download_file().as_posix())
def write_result(self):
"""Write the structured json result for this doc. result must be json serializable."""
if self.standard_config.download_only:
return
output_filename = self._output_filename()
output_filename.parent.mkdir(parents=True, exist_ok=True)
with open(output_filename, "w") as output_f:
output_f.write(
json.dumps(self.isd_elems_no_filename, ensure_ascii=False, indent=2),
)
logger.info(f"Wrote {output_filename}")
@property
def filename(self):
"""The filename of the file after downloading from cloud"""
return self._tmp_download_file()
def cleanup_file(self):
"""Removes the local copy of the file after successful processing."""
if not self.standard_config.preserve_downloads and not self.standard_config.download_only:
logger.debug(f"Cleaning up {self}")
try:
os.unlink(self._tmp_download_file())
except OSError as e: # Don't think we need to raise an exception
logger.debug(f"Failed to remove {self._tmp_download_file()} due to {e}")
class FsspecConnector(BaseConnector):
class FsspecConnector(ConnectorCleanupMixin, BaseConnector):
"""Objects of this class support fetching document(s) from"""
config: SimpleFsspecConfig
@ -157,27 +124,6 @@ class FsspecConnector(BaseConnector):
self.fs: AbstractFileSystem = get_filesystem_class(self.config.protocol)(
**self.config.access_kwargs,
)
self.cleanup_files = (
not standard_config.preserve_downloads and not standard_config.download_only
)
def cleanup(self, cur_dir=None):
"""cleanup linginering empty sub-dirs from cloud paths, but leave remaining files
(and their paths) in tact as that indicates they were not processed"""
if not self.cleanup_files:
return
if cur_dir is None:
cur_dir = self.standard_config.download_dir
sub_dirs = os.listdir(cur_dir)
os.chdir(cur_dir)
for sub_dir in sub_dirs:
# don't traverse symlinks, not that there every should be any
if os.path.isdir(sub_dir) and not os.path.islink(sub_dir):
self.cleanup(sub_dir)
os.chdir("..")
if len(os.listdir(cur_dir)) == 0:
os.rmdir(cur_dir)
def initialize(self):
"""Verify that can get metadata for an object, validates connections info."""

View File

@ -1,5 +1,4 @@
import fnmatch
import json
import os
from dataclasses import dataclass, field
from pathlib import Path
@ -9,6 +8,8 @@ from unstructured.ingest.interfaces import (
BaseConnector,
BaseConnectorConfig,
BaseIngestDoc,
ConnectorCleanupMixin,
IngestDocCleanupMixin,
)
from unstructured.ingest.logger import logger
@ -23,7 +24,7 @@ class SimpleGitConfig(BaseConnectorConfig):
@dataclass
class GitIngestDoc(BaseIngestDoc):
class GitIngestDoc(IngestDocCleanupMixin, BaseIngestDoc):
config: SimpleGitConfig = field(repr=False)
path: str
@ -31,6 +32,7 @@ class GitIngestDoc(BaseIngestDoc):
def filename(self):
return (Path(self.standard_config.download_dir) / self.path).resolve()
@property
def _output_filename(self):
return Path(self.standard_config.output_dir) / f"{self.path}.json"
@ -38,70 +40,21 @@ class GitIngestDoc(BaseIngestDoc):
"""includes directories in in the gitlab repository"""
self.filename.parent.mkdir(parents=True, exist_ok=True)
def cleanup_file(self):
"""Removes the local copy of the file (or anything else) after successful processing."""
if not self.standard_config.preserve_downloads and not self.standard_config.download_only:
logger.debug(f"Cleaning up {self}")
os.unlink(self.filename)
@BaseIngestDoc.skip_if_file_exists
def get_file(self):
"""Fetches the "remote" doc and stores it locally on the filesystem."""
self._create_full_tmp_dir_path()
if (
not self.standard_config.re_download
and self.filename.is_file()
and self.filename.stat()
):
logger.debug(f"File exists: {self.filename}, skipping download")
return
logger.debug(f"Fetching {self} - PID: {os.getpid()}")
self._fetch_and_write()
def _fetch_and_write(self) -> None:
raise NotImplementedError()
def has_output(self):
"""Determine if structured output for this doc already exists."""
output_filename = self._output_filename()
return output_filename.is_file() and output_filename.stat()
def write_result(self):
"""Write the structured json result for this doc. result must be json serializable."""
if self.standard_config.download_only:
return
output_filename = self._output_filename()
output_filename.parent.mkdir(parents=True, exist_ok=True)
with open(output_filename, "w", encoding="utf8") as output_f:
json.dump(self.isd_elems_no_filename, output_f, ensure_ascii=False, indent=2)
logger.info(f"Wrote {output_filename}")
@dataclass
class GitConnector(BaseConnector):
class GitConnector(ConnectorCleanupMixin, BaseConnector):
config: SimpleGitConfig
def __post_init__(self) -> None:
self.cleanup_files = (
not self.standard_config.preserve_downloads and not self.standard_config.download_only
)
def cleanup(self, cur_dir=None):
if not self.cleanup_files:
return
if cur_dir is None:
cur_dir = self.standard_config.download_dir
sub_dirs = os.listdir(cur_dir)
os.chdir(cur_dir)
for sub_dir in sub_dirs:
# don't traverse symlinks, not that there every should be any
if os.path.isdir(sub_dir) and not os.path.islink(sub_dir):
self.cleanup(sub_dir)
os.chdir("..")
if len(os.listdir(cur_dir)) == 0:
os.rmdir(cur_dir)
def initialize(self):
pass

View File

@ -67,7 +67,6 @@ class GitHubIngestDoc(GitIngestDoc):
@dataclass
class GitHubConnector(GitConnector):
def __post_init__(self) -> None:
super().__post_init__()
from github import Github
self.github = Github(self.config.access_token)

View File

@ -47,7 +47,6 @@ class GitLabIngestDoc(GitIngestDoc):
@dataclass
class GitLabConnector(GitConnector):
def __post_init__(self) -> None:
super().__post_init__()
from gitlab import Gitlab
self.gitlab = Gitlab(self.config.url, private_token=self.config.access_token)

View File

@ -12,6 +12,8 @@ from unstructured.ingest.interfaces import (
BaseConnector,
BaseConnectorConfig,
BaseIngestDoc,
ConnectorCleanupMixin,
IngestDocCleanupMixin,
StandardConnectorConfig,
)
from unstructured.ingest.logger import logger
@ -83,7 +85,7 @@ class SimpleGoogleDriveConfig(BaseConnectorConfig):
@dataclass
class GoogleDriveIngestDoc(BaseIngestDoc):
class GoogleDriveIngestDoc(IngestDocCleanupMixin, BaseIngestDoc):
config: SimpleGoogleDriveConfig
file_meta: Dict
@ -91,36 +93,16 @@ class GoogleDriveIngestDoc(BaseIngestDoc):
def filename(self):
return Path(self.file_meta.get("download_filepath")).resolve() # type: ignore
@property
def _output_filename(self):
return Path(f"{self.file_meta.get('output_filepath')}.json").resolve()
def cleanup_file(self):
if (
not self.standard_config.preserve_downloads
and self.filename.is_file()
and not self.standard_config.download_only
):
logger.debug(f"Cleaning up {self}")
Path.unlink(self.filename)
def has_output(self):
"""Determine if structured output for this doc already exists."""
output_filename = self._output_filename()
return output_filename.is_file() and output_filename.stat()
@BaseIngestDoc.skip_if_file_exists
@requires_dependencies(["googleapiclient"], extras="google-drive")
def get_file(self):
from googleapiclient.errors import HttpError
from googleapiclient.http import MediaIoBaseDownload
if (
not self.standard_config.re_download
and self.filename.is_file()
and self.filename.stat()
):
logger.debug(f"File exists: {self.filename}, skipping download")
return
self.config.service = create_service_account_object(self.config.service_account_key)
if self.file_meta.get("mimeType", "").startswith("application/vnd.google-apps"):
@ -143,7 +125,6 @@ class GoogleDriveIngestDoc(BaseIngestDoc):
request = self.config.service.files().get_media(fileId=self.file_meta.get("id"))
file = io.BytesIO()
downloader = MediaIoBaseDownload(file, request)
downloaded = False
try:
while downloaded is False:
@ -173,23 +154,19 @@ class GoogleDriveIngestDoc(BaseIngestDoc):
"""Write the structured json result for this doc. result must be json serializable."""
if self.standard_config.download_only:
return
output_filename = self._output_filename()
output_filename.parent.mkdir(parents=True, exist_ok=True)
with open(output_filename, "w") as output_f:
self._output_filename.parent.mkdir(parents=True, exist_ok=True)
with open(self._output_filename, "w") as output_f:
output_f.write(json.dumps(self.isd_elems_no_filename, ensure_ascii=False, indent=2))
logger.info(f"Wrote {output_filename}")
logger.info(f"Wrote {self._output_filename}")
class GoogleDriveConnector(BaseConnector):
class GoogleDriveConnector(ConnectorCleanupMixin, BaseConnector):
"""Objects of this class support fetching documents from Google Drive"""
config: SimpleGoogleDriveConfig
def __init__(self, standard_config: StandardConnectorConfig, config: SimpleGoogleDriveConfig):
super().__init__(standard_config, config)
self.cleanup_files = (
not self.standard_config.preserve_downloads and not self.standard_config.download_only
)
def _list_objects(self, drive_id, recursive=False):
files = []
@ -262,26 +239,6 @@ class GoogleDriveConnector(BaseConnector):
)
return files
def cleanup(self, cur_dir=None):
if not self.cleanup_files:
return
if cur_dir is None:
cur_dir = self.standard_config.download_dir
if cur_dir is None or not Path(cur_dir).is_dir():
return
sub_dirs = os.listdir(cur_dir)
os.chdir(cur_dir)
for sub_dir in sub_dirs:
# don't traverse symlinks, not that there every should be any
if os.path.isdir(sub_dir) and not os.path.islink(sub_dir):
self.cleanup(sub_dir)
os.chdir("..")
if len(os.listdir(cur_dir)) == 0:
os.rmdir(cur_dir)
def initialize(self):
pass

View File

@ -1,6 +1,5 @@
import fnmatch
import glob
import json
import os
from dataclasses import dataclass
from pathlib import Path
@ -51,26 +50,13 @@ class LocalIngestDoc(BaseIngestDoc):
"""Not applicable to local file system"""
pass
@property
def _output_filename(self):
return (
Path(self.standard_config.output_dir)
/ f"{self.path.replace(f'{self.config.input_path}/', '')}.json"
)
def has_output(self):
"""Determine if structured output for this doc already exists."""
return self._output_filename().is_file() and os.path.getsize(self._output_filename())
def write_result(self):
"""Write the structured json result for this doc. result must be json serializable."""
if self.standard_config.download_only:
return
output_filename = self._output_filename()
output_filename.parent.mkdir(parents=True, exist_ok=True)
with open(output_filename, "w") as output_f:
output_f.write(json.dumps(self.isd_elems_no_filename, ensure_ascii=False, indent=2))
logger.info(f"Wrote {output_filename}")
class LocalConnector(BaseConnector):
"""Objects of this class support fetching document(s) from local file system"""

View File

@ -1,4 +1,3 @@
import json
import os
from dataclasses import dataclass, field
from pathlib import Path
@ -8,6 +7,8 @@ from unstructured.ingest.interfaces import (
BaseConnector,
BaseConnectorConfig,
BaseIngestDoc,
ConnectorCleanupMixin,
IngestDocCleanupMixin,
StandardConnectorConfig,
)
from unstructured.ingest.logger import logger
@ -32,7 +33,7 @@ class SimpleRedditConfig(BaseConnectorConfig):
@dataclass
class RedditIngestDoc(BaseIngestDoc):
class RedditIngestDoc(IngestDocCleanupMixin, BaseIngestDoc):
config: SimpleRedditConfig = field(repr=False)
post: "Submission"
@ -40,53 +41,26 @@ class RedditIngestDoc(BaseIngestDoc):
def filename(self) -> Path:
return (Path(self.standard_config.download_dir) / f"{self.post.id}.md").resolve()
@property
def _output_filename(self):
return Path(self.standard_config.output_dir) / f"{self.post.id}.json"
def _create_full_tmp_dir_path(self):
self.filename.parent.mkdir(parents=True, exist_ok=True)
def cleanup_file(self):
"""Removes the local copy of the file (or anything else) after successful processing."""
if not self.standard_config.preserve_downloads and not self.standard_config.download_only:
logger.debug(f"Cleaning up {self}")
os.unlink(self.filename)
@BaseIngestDoc.skip_if_file_exists
def get_file(self):
"""Fetches the "remote" doc and stores it locally on the filesystem."""
self._create_full_tmp_dir_path()
if (
not self.standard_config.re_download
and self.filename.is_file()
and self.filename.stat()
):
logger.debug(f"File exists: {self.filename}, skipping download")
return
logger.debug(f"Fetching {self} - PID: {os.getpid()}")
# Write the title plus the body, if any
text_to_write = f"# {self.post.title}\n{self.post.selftext}"
with open(self.filename, "w", encoding="utf8") as f:
f.write(text_to_write)
def has_output(self):
"""Determine if structured output for this doc already exists."""
output_filename = self._output_filename()
return output_filename.is_file() and output_filename.stat()
def write_result(self):
"""Write the structured json result for this doc. result must be json serializable."""
if self.standard_config.download_only:
return
output_filename = self._output_filename()
output_filename.parent.mkdir(parents=True, exist_ok=True)
with open(output_filename, "w", encoding="utf8") as output_f:
json.dump(self.isd_elems_no_filename, output_f, ensure_ascii=False, indent=2)
logger.info(f"Wrote {output_filename}")
@requires_dependencies(["praw"], extras="reddit")
class RedditConnector(BaseConnector):
class RedditConnector(ConnectorCleanupMixin, BaseConnector):
config: SimpleRedditConfig
def __init__(self, standard_config: StandardConnectorConfig, config: SimpleRedditConfig):
@ -98,25 +72,6 @@ class RedditConnector(BaseConnector):
client_secret=config.client_secret,
user_agent=config.user_agent,
)
self.cleanup_files = (
not standard_config.preserve_downloads and not standard_config.download_only
)
def cleanup(self, cur_dir=None):
if not self.cleanup_files:
return
if cur_dir is None:
cur_dir = self.standard_config.download_dir
sub_dirs = os.listdir(cur_dir)
os.chdir(cur_dir)
for sub_dir in sub_dirs:
# don't traverse symlinks, not that there every should be any
if os.path.isdir(sub_dir) and not os.path.islink(sub_dir):
self.cleanup(sub_dir)
os.chdir("..")
if len(os.listdir(cur_dir)) == 0:
os.rmdir(cur_dir)
def initialize(self):
pass

View File

@ -1,17 +1,15 @@
import json
import os
from dataclasses import dataclass
from datetime import datetime
from pathlib import Path
from typing import List
from slack_sdk import WebClient
from slack_sdk.errors import SlackApiError
from unstructured.ingest.interfaces import (
BaseConnector,
BaseConnectorConfig,
BaseIngestDoc,
ConnectorCleanupMixin,
IngestDocCleanupMixin,
StandardConnectorConfig,
)
from unstructured.ingest.logger import logger
@ -59,7 +57,7 @@ class SimpleSlackConfig(BaseConnectorConfig):
@dataclass
class SlackIngestDoc(BaseIngestDoc):
class SlackIngestDoc(IngestDocCleanupMixin, BaseIngestDoc):
"""Class encapsulating fetching a doc and writing processed results (but not
doing the processing!).
@ -80,30 +78,23 @@ class SlackIngestDoc(BaseIngestDoc):
channel_file = self.channel + ".txt"
return Path(self.standard_config.download_dir) / channel_file
@property
def _output_filename(self):
output_file = self.channel + ".json"
return Path(self.standard_config.output_dir) / output_file
def has_output(self):
"""Determine if structured output for this doc already exists."""
return self._output_filename().is_file() and os.path.getsize(self._output_filename())
def _create_full_tmp_dir_path(self):
self._tmp_download_file().parent.mkdir(parents=True, exist_ok=True)
@BaseIngestDoc.skip_if_file_exists
@requires_dependencies(dependencies=["slack_sdk"], extras="slack")
def get_file(self):
from slack_sdk import WebClient
from slack_sdk.errors import SlackApiError
"""Fetches the data from a slack channel and stores it locally."""
self._create_full_tmp_dir_path()
if (
not self.standard_config.re_download
and self._tmp_download_file().is_file()
and os.path.getsize(self._tmp_download_file())
):
if self.config.verbose:
logger.debug(f"File exists: {self._tmp_download_file()}, skipping download")
return
if self.config.verbose:
logger.debug(f"fetching channel {self.channel} - PID: {os.getpid()}")
@ -141,14 +132,6 @@ class SlackIngestDoc(BaseIngestDoc):
for message in messages:
channel_file.write(message["text"] + "\n")
def write_result(self):
"""Write the structured json result for this doc. result must be json serializable."""
output_filename = self._output_filename()
output_filename.parent.mkdir(parents=True, exist_ok=True)
with open(output_filename, "w") as output_f:
output_f.write(json.dumps(self.isd_elems_no_filename, ensure_ascii=False, indent=2))
logger.info(f"Wrote {output_filename}")
def convert_datetime(self, date_time):
for format in DATE_FORMATS:
try:
@ -161,41 +144,15 @@ class SlackIngestDoc(BaseIngestDoc):
"""The filename of the file created from a slack channel"""
return self._tmp_download_file()
def cleanup_file(self):
"""Removes the local copy the file after successful processing."""
if not self.standard_config.preserve_downloads:
if self.config.verbose:
logger.info(f"cleaning up channel {self.channel}")
os.unlink(self._tmp_download_file())
@requires_dependencies(dependencies=["slack_sdk"], extras="slack")
class SlackConnector(BaseConnector):
class SlackConnector(ConnectorCleanupMixin, BaseConnector):
"""Objects of this class support fetching document(s) from"""
config: SimpleSlackConfig
def __init__(self, standard_config: StandardConnectorConfig, config: SimpleSlackConfig):
super().__init__(standard_config, config)
self.cleanup_files = not standard_config.preserve_downloads
def cleanup(self, cur_dir=None):
"""cleanup linginering empty sub-dirs, but leave remaining files
(and their paths) in tact as that indicates they were not processed"""
if not self.cleanup_files:
return
if cur_dir is None:
cur_dir = self.standard_config.download_dir
sub_dirs = os.listdir(cur_dir)
os.chdir(cur_dir)
for sub_dir in sub_dirs:
# don't traverse symlinks, not that there every should be any
if os.path.isdir(sub_dir) and not os.path.islink(sub_dir):
self.cleanup(sub_dir)
os.chdir("..")
if len(os.listdir(cur_dir)) == 0:
os.rmdir(cur_dir)
def initialize(self):
"""Verify that can get metadata for an object, validates connections info."""

View File

@ -1,4 +1,3 @@
import json
import os
from dataclasses import dataclass, field
from pathlib import Path
@ -8,6 +7,8 @@ from unstructured.ingest.interfaces import (
BaseConnector,
BaseConnectorConfig,
BaseIngestDoc,
ConnectorCleanupMixin,
IngestDocCleanupMixin,
StandardConnectorConfig,
)
from unstructured.ingest.logger import logger
@ -23,7 +24,7 @@ class SimpleWikipediaConfig(BaseConnectorConfig):
@dataclass
class WikipediaIngestDoc(BaseIngestDoc):
class WikipediaIngestDoc(IngestDocCleanupMixin, BaseIngestDoc):
config: SimpleWikipediaConfig = field(repr=False)
page: "WikipediaPage"
@ -35,48 +36,21 @@ class WikipediaIngestDoc(BaseIngestDoc):
def text(self) -> str:
raise NotImplementedError()
@property
def _output_filename(self):
raise NotImplementedError()
def _create_full_tmp_dir_path(self):
self.filename.parent.mkdir(parents=True, exist_ok=True)
def cleanup_file(self):
"""Removes the local copy of the file (or anything else) after successful processing."""
if not self.standard_config.preserve_downloads and not self.standard_config.download_only:
logger.debug(f"Cleaning up {self}")
os.unlink(self.filename)
@BaseIngestDoc.skip_if_file_exists
def get_file(self):
"""Fetches the "remote" doc and stores it locally on the filesystem."""
self._create_full_tmp_dir_path()
if (
not self.standard_config.re_download
and self.filename.is_file()
and self.filename.stat()
):
logger.debug(f"File exists: {self.filename}, skipping download")
return
logger.debug(f"Fetching {self} - PID: {os.getpid()}")
with open(self.filename, "w", encoding="utf8") as f:
f.write(self.text)
def has_output(self):
"""Determine if structured output for this doc already exists."""
output_filename = self._output_filename()
return output_filename.is_file() and output_filename.stat()
def write_result(self):
"""Write the structured json result for this doc. result must be json serializable."""
if self.standard_config.download_only:
return
output_filename = self._output_filename()
output_filename.parent.mkdir(parents=True, exist_ok=True)
with open(output_filename, "w", encoding="utf8") as output_f:
json.dump(self.isd_elems_no_filename, output_f, ensure_ascii=False, indent=2)
logger.info(f"Wrote {output_filename}")
class WikipediaIngestHTMLDoc(WikipediaIngestDoc):
@property
@ -90,6 +64,7 @@ class WikipediaIngestHTMLDoc(WikipediaIngestDoc):
def text(self):
return self.page.html()
@property
def _output_filename(self):
return (
Path(self.standard_config.output_dir)
@ -109,6 +84,7 @@ class WikipediaIngestTextDoc(WikipediaIngestDoc):
def text(self):
return self.page.content
@property
def _output_filename(self):
return (
Path(self.standard_config.output_dir)
@ -128,6 +104,7 @@ class WikipediaIngestSummaryDoc(WikipediaIngestDoc):
def text(self):
return self.page.summary
@property
def _output_filename(self):
return (
Path(self.standard_config.output_dir)
@ -135,30 +112,11 @@ class WikipediaIngestSummaryDoc(WikipediaIngestDoc):
)
class WikipediaConnector(BaseConnector):
class WikipediaConnector(ConnectorCleanupMixin, BaseConnector):
config: SimpleWikipediaConfig
def __init__(self, config: SimpleWikipediaConfig, standard_config: StandardConnectorConfig):
super().__init__(standard_config, config)
self.cleanup_files = (
not standard_config.preserve_downloads and not standard_config.download_only
)
def cleanup(self, cur_dir=None):
if not self.cleanup_files:
return
if cur_dir is None:
cur_dir = self.standard_config.download_dir
sub_dirs = os.listdir(cur_dir)
os.chdir(cur_dir)
for sub_dir in sub_dirs:
# don't traverse symlinks, not that there every should be any
if os.path.isdir(sub_dir) and not os.path.islink(sub_dir):
self.cleanup(sub_dir)
os.chdir("..")
if len(os.listdir(cur_dir)) == 0:
os.rmdir(cur_dir)
def initialize(self):
pass

View File

@ -1,9 +1,13 @@
"""Defines Abstract Base Classes (ABC's) core to batch processing documents
through Unstructured."""
import functools
import json
import os
from abc import ABC, abstractmethod
from dataclasses import dataclass
from datetime import datetime
from pathlib import Path
from typing import Any, Dict, List, Optional
import requests
@ -117,6 +121,11 @@ class BaseIngestDoc(ABC):
def filename(self):
"""The local filename of the document after fetching from remote source."""
@property
@abstractmethod
def _output_filename(self):
"""Filename of the structured output for this doc."""
@property
def record_locator(self) -> Optional[Dict[str, Any]]: # Values must be JSON-serializable
"""A dictionary with any data necessary to uniquely identify the document on
@ -140,6 +149,20 @@ class BaseIngestDoc(ABC):
"""Removes the local copy the file (or anything else) after successful processing."""
pass
@staticmethod
def skip_if_file_exists(func):
"""Decorator that checks if a file exists, is not empty, and should not re-download,
if so log a message indicating as much and skip the decorated function."""
@functools.wraps(func)
def wrapper(self, *args, **kwargs):
if not self.standard_config.re_download and self.has_output():
logger.debug(f"File exists: {self.filename}, skipping {func.__name__}")
return None
return func(self, *args, **kwargs)
return wrapper
# NOTE(crag): Future BaseIngestDoc classes could define get_file_object() methods
# in addition to or instead of get_file()
@abstractmethod
@ -147,15 +170,18 @@ class BaseIngestDoc(ABC):
"""Fetches the "remote" doc and stores it locally on the filesystem."""
pass
@abstractmethod
def has_output(self) -> bool:
"""Determine if structured output for this doc already exists."""
pass
return self._output_filename.is_file() and self._output_filename.stat().st_size
@abstractmethod
def write_result(self):
"""Write the structured json result for this doc. result must be json serializable."""
pass
if self.standard_config.download_only:
return
self._output_filename.parent.mkdir(parents=True, exist_ok=True)
with open(self._output_filename, "w", encoding="utf8") as output_f:
json.dump(self.isd_elems_no_filename, output_f, ensure_ascii=False, indent=2)
logger.info(f"Wrote {self._output_filename}")
def partition_file(self, **partition_kwargs) -> List[Dict[str, Any]]:
if not self.standard_config.partition_by_api:
@ -246,3 +272,44 @@ class BaseIngestDoc(ABC):
self.isd_elems_no_filename.append(elem)
return self.isd_elems_no_filename
class ConnectorCleanupMixin:
standard_config: StandardConnectorConfig
def cleanup(self, cur_dir=None):
"""Recursively clean up downloaded files and directories."""
if self.standard_config.preserve_downloads or self.standard_config.download_only:
return
if cur_dir is None:
cur_dir = self.standard_config.download_dir
if cur_dir is None or not Path(cur_dir).is_dir():
return
sub_dirs = os.listdir(cur_dir)
os.chdir(cur_dir)
for sub_dir in sub_dirs:
# don't traverse symlinks, not that there every should be any
if os.path.isdir(sub_dir) and not os.path.islink(sub_dir):
self.cleanup(sub_dir)
os.chdir("..")
if len(os.listdir(cur_dir)) == 0:
os.rmdir(cur_dir)
class IngestDocCleanupMixin:
standard_config: StandardConnectorConfig
@property
@abstractmethod
def filename(self):
"""The local filename of the document after fetching from remote source."""
def cleanup_file(self):
"""Removes the local copy of the file after successful processing."""
if (
not self.standard_config.preserve_downloads
and self.filename.is_file()
and not self.standard_config.download_only
):
logger.debug(f"Cleaning up {self}")
os.unlink(self.filename)