Connector for Google Drive (#294)

Implements issue #244
This commit is contained in:
Habeeb Shopeju 2023-03-07 06:01:02 +00:00 committed by GitHub
parent 905e4ae8f6
commit 4117f57e14
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
11 changed files with 611 additions and 4 deletions

View File

@ -1,4 +1,4 @@
## 0.5.3-dev1
## 0.5.3-dev2
### Enhancements
@ -7,6 +7,7 @@
* Add `--wikipedia-auto-suggest` argument to the ingest CLI to disable automatic redirection
to pages with similar names.
* Add optional `encoding` argument to the `partition_(text/email/html)` functions.
* Added Google Drive connector for ingest cli.
### Fixes

View File

@ -68,6 +68,7 @@ In checklist form, the above steps are summarized as:
- [ ] Update the Makefile, adding a target for `install-ingest-<name>` and adding another `pip-compile` line to the `pip-compile` make target. See [this commit](https://github.com/Unstructured-IO/unstructured/commit/ab542ca3c6274f96b431142262d47d727f309e37) for a reference.
- [ ] The added dependencies should be imported at runtime when the new connector is invoked, rather than as top-level imports.
- [ ] Add the decorator `unstructured.utils.requires_dependencies` on top of each class instance or function that uses those connector-specific dependencies e.g. for `S3Connector` should look like `@requires_dependencies(dependencies=["boto3"], extras="s3")`
- [ ] Run `make tidy` and `make check` to ensure linting checks pass.
- [ ] Honors the conventions of `BaseConnectorConfig` defined in [unstructured/ingest/interfaces.py](unstructured/ingest/interfaces.py) which is passed through [the CLI](unstructured/ingest/main.py):
- [ ] If running with an `.output_dir` where structured outputs already exists for a given file, the file content is not re-downloaded from the data source nor is it reprocessed. This is made possible by implementing the call to `MyIngestDoc.has_output()` which is invoked in [MainProcess._filter_docs_with_outputs](ingest-prep-for-many/unstructured/ingest/main.py).
- [ ] Unless `.reprocess` is `True`, then documents are always reprocessed.

View File

@ -49,6 +49,10 @@ install-dev:
install-build:
pip install -r requirements/build.txt
.PHONY: install-ingest-google-drive
install-ingest-google-drive:
pip install -r requirements/ingest-google-drive.txt
## install-ingest-s3: install requirements for the s3 connector
.PHONY: install-ingest-s3
install-ingest-s3:
@ -98,6 +102,7 @@ pip-compile:
pip-compile --upgrade --extra=reddit --output-file=requirements/ingest-reddit.txt requirements/base.txt setup.py
pip-compile --upgrade --extra=github --output-file=requirements/ingest-github.txt requirements/base.txt setup.py
pip-compile --upgrade --extra=wikipedia --output-file=requirements/ingest-wikipedia.txt requirements/base.txt setup.py
pip-compile --upgrade --extra=google-drive --output-file=requirements/ingest-google-drive.txt requirements/base.txt setup.py
## install-project-local: install unstructured into your local python environment
.PHONY: install-project-local

View File

@ -0,0 +1,36 @@
#!/usr/bin/env bash
# Processes the Unstructured-IO/unstructured repository
# through Unstructured's library in 2 processes.
# Structured outputs are stored in google-drive-ingest-output/
# NOTE, this script is not ready-to-run!
# You must enter a Drive ID and a Drive Service Account Key before running.
# You can find out how to the Service account Key:
# https://developers.google.com/workspace/guides/create-credentials#service-account
# The File or Folder ID can be gotten from the url of the file, such as:
# https://drive.google.com/drive/folders/{folder-id}
# https://drive.google.com/file/d/{file-id}
# NOTE: Using the Service Account key only works when the file or folder
# is shared atleast with permission for "Anyone with the link" to view
# OR the email address for the service account is given access to the file
# or folder.
SCRIPT_DIR=$( cd -- "$( dirname -- "${BASH_SOURCE[0]}" )" &> /dev/null && pwd )
cd "$SCRIPT_DIR"/../../.. || exit 1
PYTHONPATH=. ./unstructured/ingest/main.py \
--drive-id "<file or folder id>" \
--drive-service-account-key "<path to drive service account key>" \
--structured-output-dir google-drive-ingest-output \
--num-processes 2 \
--drive-recursive \
--verbose \
# --extension ".docx" # Ensures only .docx files are processed.
# Alternatively, you can call it using:
# unstructured-ingest --drive-id ...

View File

@ -0,0 +1,218 @@
#
# This file is autogenerated by pip-compile with Python 3.9
# by the following command:
#
# pip-compile --extra=google-drive --output-file=requirements/ingest-google-drive.txt requirements/base.txt setup.py
#
anyio==3.6.2
# via
# -r requirements/base.txt
# httpcore
argilla==1.3.1
# via
# -r requirements/base.txt
# unstructured (setup.py)
backoff==2.2.1
# via
# -r requirements/base.txt
# argilla
cachetools==5.3.0
# via google-auth
certifi==2022.12.7
# via
# -r requirements/base.txt
# httpcore
# httpx
# requests
# unstructured (setup.py)
charset-normalizer==3.0.1
# via
# -r requirements/base.txt
# requests
click==8.1.3
# via
# -r requirements/base.txt
# nltk
deprecated==1.2.13
# via
# -r requirements/base.txt
# argilla
et-xmlfile==1.1.0
# via
# -r requirements/base.txt
# openpyxl
google-api-core==2.11.0
# via google-api-python-client
google-api-python-client==2.80.0
# via unstructured (setup.py)
google-auth==2.16.2
# via
# google-api-core
# google-api-python-client
# google-auth-httplib2
google-auth-httplib2==0.1.0
# via google-api-python-client
googleapis-common-protos==1.58.0
# via google-api-core
h11==0.14.0
# via
# -r requirements/base.txt
# httpcore
httpcore==0.16.3
# via
# -r requirements/base.txt
# httpx
httplib2==0.21.0
# via
# google-api-python-client
# google-auth-httplib2
httpx==0.23.3
# via
# -r requirements/base.txt
# argilla
idna==3.4
# via
# -r requirements/base.txt
# anyio
# requests
# rfc3986
importlib-metadata==6.0.0
# via
# -r requirements/base.txt
# markdown
joblib==1.2.0
# via
# -r requirements/base.txt
# nltk
lxml==4.9.2
# via
# -r requirements/base.txt
# python-docx
# python-pptx
# unstructured (setup.py)
markdown==3.4.1
# via
# -r requirements/base.txt
# unstructured (setup.py)
monotonic==1.6
# via
# -r requirements/base.txt
# argilla
nltk==3.8.1
# via
# -r requirements/base.txt
# unstructured (setup.py)
numpy==1.23.5
# via
# -r requirements/base.txt
# argilla
# pandas
openpyxl==3.1.1
# via
# -r requirements/base.txt
# unstructured (setup.py)
packaging==23.0
# via
# -r requirements/base.txt
# argilla
pandas==1.5.3
# via
# -r requirements/base.txt
# argilla
# unstructured (setup.py)
pillow==9.4.0
# via
# -r requirements/base.txt
# python-pptx
# unstructured (setup.py)
protobuf==4.22.0
# via
# google-api-core
# googleapis-common-protos
pyasn1==0.4.8
# via
# pyasn1-modules
# rsa
pyasn1-modules==0.2.8
# via google-auth
pydantic==1.10.5
# via
# -r requirements/base.txt
# argilla
pyparsing==3.0.9
# via httplib2
python-dateutil==2.8.2
# via
# -r requirements/base.txt
# pandas
python-docx==0.8.11
# via
# -r requirements/base.txt
# unstructured (setup.py)
python-magic==0.4.27
# via
# -r requirements/base.txt
# unstructured (setup.py)
python-pptx==0.6.21
# via
# -r requirements/base.txt
# unstructured (setup.py)
pytz==2022.7.1
# via
# -r requirements/base.txt
# pandas
regex==2022.10.31
# via
# -r requirements/base.txt
# nltk
requests==2.28.2
# via
# -r requirements/base.txt
# google-api-core
# unstructured (setup.py)
rfc3986[idna2008]==1.5.0
# via
# -r requirements/base.txt
# httpx
rsa==4.9
# via google-auth
six==1.16.0
# via
# -r requirements/base.txt
# google-auth
# google-auth-httplib2
# python-dateutil
sniffio==1.3.0
# via
# -r requirements/base.txt
# anyio
# httpcore
# httpx
tqdm==4.64.1
# via
# -r requirements/base.txt
# argilla
# nltk
typing-extensions==4.5.0
# via
# -r requirements/base.txt
# pydantic
uritemplate==4.1.1
# via google-api-python-client
urllib3==1.26.14
# via
# -r requirements/base.txt
# requests
wrapt==1.14.1
# via
# -r requirements/base.txt
# argilla
# deprecated
xlsxwriter==3.0.8
# via
# -r requirements/base.txt
# python-pptx
zipp==3.15.0
# via
# -r requirements/base.txt
# importlib-metadata

View File

@ -85,6 +85,7 @@ setup(
],
"reddit": ["praw"],
"wikipedia": ["wikipedia"],
"google-drive": ["google-api-python-client"],
},
package_dir={"unstructured": "unstructured"},
package_data={"unstructured": ["nlp/*.txt"]},

View File

@ -1 +1 @@
__version__ = "0.5.3-dev1" # pragma: no cover
__version__ = "0.5.3-dev2" # pragma: no cover

View File

@ -218,10 +218,11 @@ def detect_filetype(
with open(filename, "rb") as f:
filetype = _detect_filetype_from_octet_stream(file=f)
extension = extension if extension else ""
if filetype == FileType.UNK:
return FileType.ZIP
return EXT_TO_FILETYPE.get(extension.lower(), FileType.ZIP)
else:
return filetype
return EXT_TO_FILETYPE.get(extension.lower(), filetype)
logger.warn(
f"MIME type was {mime_type}. This file type is not currently supported in unstructured.",

View File

@ -0,0 +1,9 @@
GOOGLE_DRIVE_EXPORT_TYPES = {
"application/vnd.google-apps.document": "application/"
"vnd.openxmlformats-officedocument.wordprocessingml.document",
"application/vnd.google-apps.spreadsheet": "application/"
"vnd.openxmlformats-officedocument.spreadsheetml.sheet",
"application/vnd.google-apps.presentation": "application/"
"vnd.openxmlformats-officedocument.presentationml.presentation",
"application/vnd.google-apps.photo": "image/jpeg",
}

View File

@ -0,0 +1,286 @@
import io
import json
import os
from dataclasses import dataclass
from mimetypes import guess_extension
from pathlib import Path
from typing import Dict
from unstructured.file_utils.filetype import EXT_TO_FILETYPE
from unstructured.file_utils.google_filetype import GOOGLE_DRIVE_EXPORT_TYPES
from unstructured.ingest.interfaces import (
BaseConnector,
BaseConnectorConfig,
BaseIngestDoc,
)
from unstructured.utils import requires_dependencies
FILE_FORMAT = "{id}-{name}{ext}"
DIRECTORY_FORMAT = "{id}-{name}"
@requires_dependencies(["googleapiclient"], extras="google-drive")
def create_service_account_object(key_path, id=None):
"""
Creates a service object for interacting with Google Drive.
Providing a drive id enforces a key validation process.
Args:
key_path: Path to Google Drive service account json file.
id: ID of a file on Google Drive. File has to be either publicly accessible or accessible
to the service account.
Returns:
Service account object
"""
from google.auth import default, exceptions
from googleapiclient.discovery import build
from googleapiclient.errors import HttpError
try:
os.environ["GOOGLE_APPLICATION_CREDENTIALS"] = key_path
creds, _ = default()
service = build("drive", "v3", credentials=creds)
if id:
service.files().list(
spaces="drive",
fields="files(id)",
pageToken=None,
corpora="user",
q=f"'{id}' in parents",
).execute()
except HttpError as exc:
raise ValueError(f"{exc.reason}")
except exceptions.DefaultCredentialsError:
raise ValueError("The provided API key is invalid.")
return service
@dataclass
class SimpleGoogleDriveConfig(BaseConnectorConfig):
"""Connector config where drive_id is the id of the document to process or
the folder to process all documents from."""
# Google Drive Specific Options
drive_id: str
service_account_key: str
extension: str
# Standard Connector options
download_dir: str
# where to write structured data, with the directory structure matching drive path
output_dir: str
re_download: bool = False
preserve_downloads: bool = False
verbose: bool = False
recursive: bool = False
def __post_init__(self):
if self.extension and self.extension not in EXT_TO_FILETYPE.keys():
raise ValueError(
f"Extension not supported. "
f"Value MUST be one of {', '.join(EXT_TO_FILETYPE.keys())}.",
)
self.service = create_service_account_object(self.service_account_key, self.drive_id)
@dataclass
class GoogleDriveIngestDoc(BaseIngestDoc):
config: SimpleGoogleDriveConfig
file_meta: Dict
@property
def filename(self):
return Path(self.file_meta.get("download_filepath")).resolve() # type: ignore
def _output_filename(self):
return Path(f"{self.file_meta.get('output_filepath')}.json").resolve()
def cleanup_file(self):
if not self.config.preserve_downloads and self.filename.is_file():
if self.config.verbose:
print(f"cleaning up {self}")
Path.unlink(self.filename)
def has_output(self):
"""Determine if structured output for this doc already exists."""
output_filename = self._output_filename()
return output_filename.is_file() and output_filename.stat()
@requires_dependencies(["googleapiclient"], extras="google-drive")
def get_file(self):
from googleapiclient.errors import HttpError
from googleapiclient.http import MediaIoBaseDownload
if not self.config.re_download and self.filename.is_file() and self.filename.stat():
if self.config.verbose:
print(f"File exists: {self.filename}, skipping download")
return
self.config.service = create_service_account_object(self.config.service_account_key)
if self.file_meta.get("mimeType", "").startswith("application/vnd.google-apps"):
export_mime = GOOGLE_DRIVE_EXPORT_TYPES.get(
self.file_meta.get("mimeType"), # type: ignore
)
if not export_mime:
print(
f"File not supported. Name: {self.file_meta.get('name')} "
f"ID: {self.file_meta.get('id')} "
f"MimeType: {self.file_meta.get('mimeType')}",
)
return
request = self.config.service.files().export_media(
fileId=self.file_meta.get("id"),
mimeType=export_mime,
)
else:
request = self.config.service.files().get_media(fileId=self.file_meta.get("id"))
file = io.BytesIO()
downloader = MediaIoBaseDownload(file, request)
downloaded = False
try:
while downloaded is False:
status, downloaded = downloader.next_chunk()
except HttpError:
pass
saved = False
if downloaded and file:
dir_ = self.file_meta.get("download_dir")
if dir_:
if not dir_.is_dir():
if self.config.verbose:
print(f"Creating directory: {self.file_meta.get('download_dir')}")
if dir_:
dir_.mkdir(parents=True, exist_ok=True)
with open(self.filename, "wb") as handler:
handler.write(file.getbuffer())
saved = True
if self.config.verbose:
print(f"File downloaded: {self.filename}.")
if not saved:
print(f"Error while downloading and saving file: {self.filename}.")
def write_result(self):
"""Write the structured json result for this doc. result must be json serializable."""
output_filename = self._output_filename()
output_filename.parent.mkdir(parents=True, exist_ok=True)
with open(output_filename, "w") as output_f:
output_f.write(json.dumps(self.isd_elems_no_filename, ensure_ascii=False, indent=2))
print(f"Wrote {output_filename}")
class GoogleDriveConnector(BaseConnector):
"""Objects of this class support fetching documents from Google Drive"""
def __init__(self, config):
self.config = config
self.cleanup_files = not self.config.preserve_downloads
def _list_objects(self, drive_id, recursive=False):
files = []
def traverse(drive_id, download_dir, output_dir, recursive=False):
page_token = None
while True:
response = (
self.config.service.files()
.list(
spaces="drive",
fields="nextPageToken, files(id, name, mimeType)",
pageToken=page_token,
corpora="user",
q=f"'{drive_id}' in parents",
)
.execute()
)
for meta in response.get("files", []):
if meta.get("mimeType") == "application/vnd.google-apps.folder":
dir_ = DIRECTORY_FORMAT.format(name=meta.get("name"), id=meta.get("id"))
if recursive:
download_sub_dir = (download_dir / dir_).resolve()
output_sub_dir = (output_dir / dir_).resolve()
traverse(meta.get("id"), download_sub_dir, output_sub_dir, True)
else:
ext = ""
if not Path(meta.get("name")).suffixes:
guess = guess_extension(meta.get("mimeType"))
ext = guess if guess else ext
if meta.get("mimeType", "").startswith("application/vnd.google-apps"):
export_mime = GOOGLE_DRIVE_EXPORT_TYPES.get(meta.get("mimeType"))
if not export_mime:
print(
f"File {meta.get('name')} has an "
f"unsupported MimeType {meta.get('mimeType')}",
)
continue
if not ext:
guess = guess_extension(export_mime)
ext = guess if guess else ext
# TODO (Habeeb): Consider filtering at the query level.
if self.config.extension and self.config.extension != ext: # noqa: SIM102
if self.config.verbose:
print(
f"File {meta.get('name')} does not match "
f"the file type {self.config.extension}",
)
continue
name = FILE_FORMAT.format(name=meta.get("name"), id=meta.get("id"), ext=ext)
meta["download_dir"] = download_dir
meta["download_filepath"] = (download_dir / name).resolve()
meta["output_dir"] = output_dir
meta["output_filepath"] = (output_dir / name).resolve()
files.append(meta)
page_token = response.get("nextPageToken", None)
if page_token is None:
break
traverse(drive_id, Path(self.config.download_dir), Path(self.config.output_dir), recursive)
return files
def cleanup(self, cur_dir=None):
if not self.cleanup_files:
return
if cur_dir is None:
cur_dir = self.config.download_dir
if cur_dir is None or not Path(cur_dir).is_dir():
return
sub_dirs = os.listdir(cur_dir)
os.chdir(cur_dir)
for sub_dir in sub_dirs:
# don't traverse symlinks, not that there every should be any
if os.path.isdir(sub_dir) and not os.path.islink(sub_dir):
self.cleanup(sub_dir)
os.chdir("..")
if len(os.listdir(cur_dir)) == 0:
os.rmdir(cur_dir)
def initialize(self):
pass
def get_ingest_docs(self):
files = self._list_objects(self.config.drive_id, self.config.recursive)
# Setting to None because service object can't be pickled for multiprocessing.
self.config.service = None
return [GoogleDriveIngestDoc(self.config, file) for file in files]

View File

@ -7,6 +7,10 @@ from pathlib import Path
import click
from unstructured.ingest.connector.github import GitHubConnector, SimpleGitHubConfig
from unstructured.ingest.connector.google_drive import (
GoogleDriveConnector,
SimpleGoogleDriveConfig,
)
from unstructured.ingest.connector.reddit import RedditConnector, SimpleRedditConfig
from unstructured.ingest.connector.s3_connector import S3Connector, SimpleS3Config
from unstructured.ingest.connector.wikipedia import (
@ -84,6 +88,28 @@ class MainProcess:
default=False,
help="Connect to s3 without local AWS credentials.",
)
@click.option(
"--drive-id",
default=None,
help="Google Drive File or Folder ID.",
)
@click.option(
"--drive-service-account-key",
default=None,
help="Path to the Google Drive service account json file.",
)
@click.option(
"--drive-recursive",
is_flag=True,
default=False,
help="Recursively download files in folders from the Google Drive ID, "
"otherwise stop at the files in provided folder level.",
)
@click.option(
"--drive-extension",
default=None,
help="Filters the files to be processed based on extension e.g. .jpg, .docx, etc.",
)
@click.option(
"--wikipedia-page-title",
default=None,
@ -187,6 +213,10 @@ class MainProcess:
@click.option("-v", "--verbose", is_flag=True, default=False)
def main(
s3_url,
drive_id,
drive_service_account_key,
drive_recursive,
drive_extension,
wikipedia_page_title,
wikipedia_auto_suggest,
github_url,
@ -228,6 +258,10 @@ def main(
hashed_dir_name = hashlib.sha256(
wikipedia_page_title.encode("utf-8"),
)
elif drive_id:
hashed_dir_name = hashlib.sha256(
drive_id.encode("utf-8"),
)
else:
raise ValueError("No connector-specific option was specified!")
download_dir = cache_path / hashed_dir_name.hexdigest()[:10]
@ -294,6 +328,21 @@ def main(
verbose=verbose,
),
)
elif drive_id:
doc_connector = GoogleDriveConnector( # type: ignore
config=SimpleGoogleDriveConfig(
drive_id=drive_id,
service_account_key=drive_service_account_key,
recursive=drive_recursive,
extension=drive_extension,
# defaults params:
download_dir=download_dir,
preserve_downloads=preserve_downloads,
output_dir=structured_output_dir,
re_download=re_download,
verbose=verbose,
),
)
# Check for other connector-specific options here and define the doc_connector object
# e.g. "elif azure_container: ..."