feat/migrate gdrive source connector (#3239)

### Description
Migrate the google drive source connector over to the new v2 ingest
framework and include a variety of improvements as part of the refactor:
* The ID is no longer limited to a drive id but can also be the id of a
subfolder within a drive or a file directly and each case is handled
appropriately
* More metadata is pulled in from google drive to enrich the partitioned
elements downstream and now the modified date is being set to not
reprocess if the ingest pipeline already has the file cached
* timing information is set on the file created when downloaded based on
the last modified data retrieved from google drive

---------

Co-authored-by: ryannikolaidis <1208590+ryannikolaidis@users.noreply.github.com>
Co-authored-by: rbiseck3 <rbiseck3@users.noreply.github.com>
This commit is contained in:
Roman Isecke 2024-06-25 08:55:28 -04:00 committed by GitHub
parent e0f4374386
commit 3f581e6b7d
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
20 changed files with 6177 additions and 1957 deletions

View File

@ -1,4 +1,4 @@
## 0.14.9-dev0 ## 0.14.9-dev1
### Enhancements ### Enhancements

View File

@ -1,54 +0,0 @@
[
{
"element_id": "75cc00b615c5db8c8ad38cd0c7e1734e",
"metadata": {
"data_source": {
"date_created": "2023-06-15T06:15:58.931000",
"date_modified": "2023-06-15T06:15:44",
"record_locator": {
"drive_id": "1OQZ66OHBE30rNsNa7dweGLfRmXvkT_jr",
"file_id": "117qrVqiCoR5EjYMsDHGdy3UMkEtKr9Q8"
},
"url": "https://drive.google.com/uc?id=117qrVqiCoR5EjYMsDHGdy3UMkEtKr9Q8&export=download"
},
"emphasized_text_contents": [
"Title"
],
"emphasized_text_tags": [
"b"
],
"filetype": "application/vnd.openxmlformats-officedocument.wordprocessingml.document",
"languages": [
"eng"
]
},
"text": "Title",
"type": "Title"
},
{
"element_id": "35a183bc208117150a33c20ef4ed0480",
"metadata": {
"data_source": {
"date_created": "2023-06-15T06:15:58.931000",
"date_modified": "2023-06-15T06:15:44",
"record_locator": {
"drive_id": "1OQZ66OHBE30rNsNa7dweGLfRmXvkT_jr",
"file_id": "117qrVqiCoR5EjYMsDHGdy3UMkEtKr9Q8"
},
"url": "https://drive.google.com/uc?id=117qrVqiCoR5EjYMsDHGdy3UMkEtKr9Q8&export=download"
},
"emphasized_text_contents": [
"This is a good reason to continue"
],
"emphasized_text_tags": [
"b"
],
"filetype": "application/vnd.openxmlformats-officedocument.wordprocessingml.document",
"languages": [
"eng"
]
},
"text": "This is a good reason to continue",
"type": "NarrativeText"
}
]

View File

@ -1,23 +0,0 @@
[
{
"element_id": "f9ff1c965598c4d1bf84bc8457964e9a",
"metadata": {
"data_source": {
"date_created": "2023-06-15T06:15:59.687000",
"date_modified": "2023-06-15T06:15:43",
"record_locator": {
"drive_id": "1OQZ66OHBE30rNsNa7dweGLfRmXvkT_jr",
"file_id": "1SpQuE7jHz9nMt5hfQXsiok1SgIdRYX5o"
},
"url": "https://drive.google.com/uc?id=1SpQuE7jHz9nMt5hfQXsiok1SgIdRYX5o&export=download"
},
"filetype": "application/vnd.openxmlformats-officedocument.wordprocessingml.document",
"languages": [
"por",
"cat"
]
},
"text": "Lorem ipsum dolor sit amet.",
"type": "Title"
}
]

View File

@ -1,22 +0,0 @@
[
{
"element_id": "bf6cce1b239d7e1481e3b99c07d8cf83",
"metadata": {
"data_source": {
"date_created": "2023-06-15T06:15:59.687000",
"date_modified": "2023-06-15T06:15:39",
"record_locator": {
"drive_id": "1OQZ66OHBE30rNsNa7dweGLfRmXvkT_jr",
"file_id": "1cTKXAreuj-wYmL38nFnqKvz3X8UKcaMC"
},
"url": "https://drive.google.com/uc?id=1cTKXAreuj-wYmL38nFnqKvz3X8UKcaMC&export=download"
},
"filetype": "text/plain",
"languages": [
"eng"
]
},
"text": "three",
"type": "Title"
}
]

View File

@ -0,0 +1,64 @@
[
{
"type": "Title",
"element_id": "56d531394823d81787d77a04462ed096",
"text": "Lorem ipsum dolor sit amet.",
"metadata": {
"languages": [
"por",
"cat"
],
"filetype": "application/vnd.openxmlformats-officedocument.wordprocessingml.document",
"data_source": {
"url": "https://drive.google.com/uc?id=1SpQuE7jHz9nMt5hfQXsiok1SgIdRYX5o&export=download",
"record_locator": {
"file_id": "1SpQuE7jHz9nMt5hfQXsiok1SgIdRYX5o"
},
"date_created": "1686809759.687",
"date_modified": "1686809743.0",
"permissions_data": [
{
"id": "18298851591250030956",
"displayName": "ingest@unstructured-ingest-test.iam.gserviceaccount.com",
"type": "user",
"kind": "drive#permission",
"photoLink": "https://lh3.googleusercontent.com/a/ACg8ocJok2KRwwYvrEDkeZVCYosHOMoa52GZa2qIIC1jScCRoFLHaQ=s64",
"emailAddress": "ingest@unstructured-ingest-test.iam.gserviceaccount.com",
"role": "writer",
"deleted": false,
"pendingOwner": false
},
{
"id": "09147371668407854156",
"displayName": "roman",
"type": "user",
"kind": "drive#permission",
"photoLink": "https://lh3.googleusercontent.com/a-/ALV-UjWoGrFCgXcF6CtiBIBLnAfM68qUnQaJOcgvg3qzfQ3W8Ch6dA=s64",
"emailAddress": "roman@unstructured.io",
"role": "writer",
"deleted": false,
"pendingOwner": false
},
{
"id": "anyoneWithLink",
"type": "anyone",
"kind": "drive#permission",
"role": "reader",
"allowFileDiscovery": false
},
{
"id": "04774006893477068632",
"displayName": "ryan",
"type": "user",
"kind": "drive#permission",
"photoLink": "https://lh3.googleusercontent.com/a-/ALV-UjXeWpu7QcZuYqIl3p1mwqzS8XGFJ4RqA3Xjljfkm1DcFZ9M7A=s64",
"emailAddress": "ryan@unstructured.io",
"role": "owner",
"deleted": false,
"pendingOwner": false
}
]
}
}
}
]

View File

@ -0,0 +1,64 @@
[
{
"type": "Title",
"element_id": "56d531394823d81787d77a04462ed096",
"text": "Lorem ipsum dolor sit amet.",
"metadata": {
"languages": [
"por",
"cat"
],
"filetype": "application/vnd.openxmlformats-officedocument.wordprocessingml.document",
"data_source": {
"url": "https://drive.google.com/uc?id=1yXGjX5j0MhKb01vfGRjNJqXyrHBnHxVo&export=download",
"record_locator": {
"file_id": "1yXGjX5j0MhKb01vfGRjNJqXyrHBnHxVo"
},
"date_created": "1718722775.76",
"date_modified": "1718722788.018",
"permissions_data": [
{
"id": "18298851591250030956",
"displayName": "ingest@unstructured-ingest-test.iam.gserviceaccount.com",
"type": "user",
"kind": "drive#permission",
"photoLink": "https://lh3.googleusercontent.com/a/ACg8ocJok2KRwwYvrEDkeZVCYosHOMoa52GZa2qIIC1jScCRoFLHaQ=s64",
"emailAddress": "ingest@unstructured-ingest-test.iam.gserviceaccount.com",
"role": "writer",
"deleted": false,
"pendingOwner": false
},
{
"id": "04774006893477068632",
"displayName": "ryan",
"type": "user",
"kind": "drive#permission",
"photoLink": "https://lh3.googleusercontent.com/a-/ALV-UjXeWpu7QcZuYqIl3p1mwqzS8XGFJ4RqA3Xjljfkm1DcFZ9M7A=s64",
"emailAddress": "ryan@unstructured.io",
"role": "writer",
"deleted": false,
"pendingOwner": false
},
{
"id": "anyoneWithLink",
"type": "anyone",
"kind": "drive#permission",
"role": "reader",
"allowFileDiscovery": false
},
{
"id": "09147371668407854156",
"displayName": "roman",
"type": "user",
"kind": "drive#permission",
"photoLink": "https://lh3.googleusercontent.com/a-/ALV-UjWoGrFCgXcF6CtiBIBLnAfM68qUnQaJOcgvg3qzfQ3W8Ch6dA=s64",
"emailAddress": "roman@unstructured.io",
"role": "owner",
"deleted": false,
"pendingOwner": false
}
]
}
}
}
]

View File

@ -0,0 +1,136 @@
[
{
"type": "Title",
"element_id": "cc23ac9998df1db62b795ec4e5133ab0",
"text": "Title",
"metadata": {
"emphasized_text_contents": [
"Title"
],
"emphasized_text_tags": [
"b"
],
"languages": [
"eng"
],
"filetype": "application/vnd.openxmlformats-officedocument.wordprocessingml.document",
"data_source": {
"url": "https://drive.google.com/uc?id=117qrVqiCoR5EjYMsDHGdy3UMkEtKr9Q8&export=download",
"record_locator": {
"file_id": "117qrVqiCoR5EjYMsDHGdy3UMkEtKr9Q8"
},
"date_created": "1686809758.931",
"date_modified": "1686809744.0",
"permissions_data": [
{
"id": "18298851591250030956",
"displayName": "ingest@unstructured-ingest-test.iam.gserviceaccount.com",
"type": "user",
"kind": "drive#permission",
"photoLink": "https://lh3.googleusercontent.com/a/ACg8ocJok2KRwwYvrEDkeZVCYosHOMoa52GZa2qIIC1jScCRoFLHaQ=s64",
"emailAddress": "ingest@unstructured-ingest-test.iam.gserviceaccount.com",
"role": "writer",
"deleted": false,
"pendingOwner": false
},
{
"id": "09147371668407854156",
"displayName": "roman",
"type": "user",
"kind": "drive#permission",
"photoLink": "https://lh3.googleusercontent.com/a-/ALV-UjWoGrFCgXcF6CtiBIBLnAfM68qUnQaJOcgvg3qzfQ3W8Ch6dA=s64",
"emailAddress": "roman@unstructured.io",
"role": "writer",
"deleted": false,
"pendingOwner": false
},
{
"id": "anyoneWithLink",
"type": "anyone",
"kind": "drive#permission",
"role": "reader",
"allowFileDiscovery": false
},
{
"id": "04774006893477068632",
"displayName": "ryan",
"type": "user",
"kind": "drive#permission",
"photoLink": "https://lh3.googleusercontent.com/a-/ALV-UjXeWpu7QcZuYqIl3p1mwqzS8XGFJ4RqA3Xjljfkm1DcFZ9M7A=s64",
"emailAddress": "ryan@unstructured.io",
"role": "owner",
"deleted": false,
"pendingOwner": false
}
]
}
}
},
{
"type": "NarrativeText",
"element_id": "9cf7b2535e79eeadfe65a5906eb35f28",
"text": "This is a good reason to continue",
"metadata": {
"emphasized_text_contents": [
"This is a good reason to continue"
],
"emphasized_text_tags": [
"b"
],
"languages": [
"eng"
],
"filetype": "application/vnd.openxmlformats-officedocument.wordprocessingml.document",
"data_source": {
"url": "https://drive.google.com/uc?id=117qrVqiCoR5EjYMsDHGdy3UMkEtKr9Q8&export=download",
"record_locator": {
"file_id": "117qrVqiCoR5EjYMsDHGdy3UMkEtKr9Q8"
},
"date_created": "1686809758.931",
"date_modified": "1686809744.0",
"permissions_data": [
{
"id": "18298851591250030956",
"displayName": "ingest@unstructured-ingest-test.iam.gserviceaccount.com",
"type": "user",
"kind": "drive#permission",
"photoLink": "https://lh3.googleusercontent.com/a/ACg8ocJok2KRwwYvrEDkeZVCYosHOMoa52GZa2qIIC1jScCRoFLHaQ=s64",
"emailAddress": "ingest@unstructured-ingest-test.iam.gserviceaccount.com",
"role": "writer",
"deleted": false,
"pendingOwner": false
},
{
"id": "09147371668407854156",
"displayName": "roman",
"type": "user",
"kind": "drive#permission",
"photoLink": "https://lh3.googleusercontent.com/a-/ALV-UjWoGrFCgXcF6CtiBIBLnAfM68qUnQaJOcgvg3qzfQ3W8Ch6dA=s64",
"emailAddress": "roman@unstructured.io",
"role": "writer",
"deleted": false,
"pendingOwner": false
},
{
"id": "anyoneWithLink",
"type": "anyone",
"kind": "drive#permission",
"role": "reader",
"allowFileDiscovery": false
},
{
"id": "04774006893477068632",
"displayName": "ryan",
"type": "user",
"kind": "drive#permission",
"photoLink": "https://lh3.googleusercontent.com/a-/ALV-UjXeWpu7QcZuYqIl3p1mwqzS8XGFJ4RqA3Xjljfkm1DcFZ9M7A=s64",
"emailAddress": "ryan@unstructured.io",
"role": "owner",
"deleted": false,
"pendingOwner": false
}
]
}
}
}
]

View File

@ -48,6 +48,8 @@ PYTHONPATH=${PYTHONPATH:-.} "$RUN_SCRIPT" \
--verbose \ --verbose \
--drive-id 1OQZ66OHBE30rNsNa7dweGLfRmXvkT_jr \ --drive-id 1OQZ66OHBE30rNsNa7dweGLfRmXvkT_jr \
--service-account-key "$GCP_INGEST_SERVICE_KEY_FILE" \ --service-account-key "$GCP_INGEST_SERVICE_KEY_FILE" \
--recursive \
--extensions "pdf,docx" \
--work-dir "$WORK_DIR" --work-dir "$WORK_DIR"
set +e set +e

View File

@ -1 +1 @@
__version__ = "0.14.9-dev0" # pragma: no cover __version__ = "0.14.9-dev1" # pragma: no cover

View File

@ -34,6 +34,10 @@ class BaseCmd(ABC):
def cmd_name_key(self): def cmd_name_key(self):
return self.cmd_name.replace("-", "_") return self.cmd_name.replace("-", "_")
@property
def cli_cmd_name(self):
return self.cmd_name.replace("_", "-")
@abstractmethod @abstractmethod
def cmd(self, ctx: click.Context, **options) -> None: def cmd(self, ctx: click.Context, **options) -> None:
pass pass

View File

@ -44,7 +44,7 @@ class DestCmd(BaseCmd):
cmd = click.command(fn) cmd = click.command(fn)
if not isinstance(cmd, click.core.Command): if not isinstance(cmd, click.core.Command):
raise ValueError(f"generated command was not of expected type Command: {type(cmd)}") raise ValueError(f"generated command was not of expected type Command: {type(cmd)}")
cmd.name = self.cmd_name cmd.name = self.cli_cmd_name
cmd.short_help = "v2" cmd.short_help = "v2"
cmd.invoke_without_command = True cmd.invoke_without_command = True
extras = [ extras = [

View File

@ -50,7 +50,7 @@ class SrcCmd(BaseCmd):
cmd = click.group(fn, cls=Group) cmd = click.group(fn, cls=Group)
if not isinstance(cmd, click.core.Group): if not isinstance(cmd, click.core.Group):
raise ValueError(f"generated src command was not of expected type Group: {type(cmd)}") raise ValueError(f"generated src command was not of expected type Group: {type(cmd)}")
cmd.name = self.cmd_name cmd.name = self.cli_cmd_name
cmd.short_help = "v2" cmd.short_help = "v2"
cmd.invoke_without_command = True cmd.invoke_without_command = True
extras = [ extras = [

View File

@ -10,6 +10,7 @@ from .fsspec.dropbox import dropbox_dest_cmd, dropbox_src_cmd
from .fsspec.gcs import gcs_dest_cmd, gcs_src_cmd from .fsspec.gcs import gcs_dest_cmd, gcs_src_cmd
from .fsspec.s3 import s3_dest_cmd, s3_src_cmd from .fsspec.s3 import s3_dest_cmd, s3_src_cmd
from .fsspec.sftp import sftp_dest_cmd, sftp_src_cmd from .fsspec.sftp import sftp_dest_cmd, sftp_src_cmd
from .google_drive import google_drive_src_cmd
from .local import local_dest_cmd, local_src_cmd from .local import local_dest_cmd, local_src_cmd
from .weaviate import weaviate_dest_cmd from .weaviate import weaviate_dest_cmd
@ -19,6 +20,7 @@ src_cmds = [
dropbox_src_cmd, dropbox_src_cmd,
elasticsearch_src_cmd, elasticsearch_src_cmd,
gcs_src_cmd, gcs_src_cmd,
google_drive_src_cmd,
local_src_cmd, local_src_cmd,
s3_src_cmd, s3_src_cmd,
sftp_src_cmd, sftp_src_cmd,

View File

@ -0,0 +1,74 @@
from dataclasses import dataclass
import click
from unstructured.ingest.v2.cli.base import SrcCmd
from unstructured.ingest.v2.cli.interfaces import CliConfig
from unstructured.ingest.v2.cli.utils import DelimitedString, FileOrJson
from unstructured.ingest.v2.processes.connectors.google_drive import CONNECTOR_TYPE
@dataclass
class GoogleDriveCliConnectionConfig(CliConfig):
@staticmethod
def get_cli_options() -> list[click.Option]:
options = [
click.Option(
["--drive-id"],
required=True,
type=str,
help="Google Drive File or Folder ID.",
),
click.Option(
["--service-account-key"],
required=True,
type=FileOrJson(),
help="Either the file path of the credentials file to use or a json string of "
"those values to use for authentication",
),
]
return options
@dataclass
class GoogleDriveCliIndexerConfig(CliConfig):
@staticmethod
def get_cli_options() -> list[click.Option]:
options = [
click.Option(
["--extensions"],
default=None,
type=DelimitedString(),
help="Filters the files to be processed based on extension e.g. jpg, docx, etc.",
),
click.Option(
["--recursive"],
is_flag=True,
default=False,
help="Recursively download files in their respective folders "
"otherwise stop at the files in provided folder level.",
),
]
return options
@dataclass
class GoogleDriveCliDownloadConfig(CliConfig):
@staticmethod
def get_cli_options() -> list[click.Option]:
options = [
click.Option(
["--download-dir"],
help="Where files are downloaded to, defaults to a location at"
"`$HOME/.cache/unstructured/ingest/<connector name>/<SHA256>`.",
),
]
return options
google_drive_src_cmd = SrcCmd(
cmd_name=CONNECTOR_TYPE,
connection_config=GoogleDriveCliConnectionConfig,
indexer_config=GoogleDriveCliIndexerConfig,
downloader_config=GoogleDriveCliDownloadConfig,
)

View File

@ -30,6 +30,10 @@ from unstructured.ingest.v2.processes.embedder import EmbedderConfig
from unstructured.ingest.v2.processes.partitioner import PartitionerConfig from unstructured.ingest.v2.processes.partitioner import PartitionerConfig
class PipelineError(Exception):
pass
@dataclass @dataclass
class Pipeline: class Pipeline:
context: ProcessorConfig context: ProcessorConfig
@ -119,6 +123,8 @@ class Pipeline:
finally: finally:
self.log_statuses() self.log_statuses()
self.cleanup() self.cleanup()
if self.context.status:
raise PipelineError("Pipeline did not run successfully")
def clean_results(self, results: Optional[list[Union[Any, list[Any]]]]) -> Optional[list[Any]]: def clean_results(self, results: Optional[list[Union[Any, list[Any]]]]) -> Optional[list[Any]]:
if not results: if not results:

View File

@ -306,9 +306,6 @@ FsspecUploaderConfigT = TypeVar("FsspecUploaderConfigT", bound=FsspecUploaderCon
class FsspecUploader(Uploader): class FsspecUploader(Uploader):
upload_config: FsspecUploaderConfigT = field(default=None) upload_config: FsspecUploaderConfigT = field(default=None)
def is_async(self) -> bool:
return self.fs.async_impl
@property @property
def fs(self) -> "AbstractFileSystem": def fs(self) -> "AbstractFileSystem":
from fsspec import get_filesystem_class from fsspec import get_filesystem_class

View File

@ -0,0 +1,357 @@
import io
import os
from dataclasses import dataclass, field
from pathlib import Path
from typing import TYPE_CHECKING, Any, Generator, Optional, Union
from dateutil import parser
from unstructured.documents.elements import DataSourceMetadata
from unstructured.file_utils.google_filetype import GOOGLE_DRIVE_EXPORT_TYPES
from unstructured.ingest.enhanced_dataclass import enhanced_field
from unstructured.ingest.error import SourceConnectionNetworkError
from unstructured.ingest.utils.string_and_date_utils import json_to_dict
from unstructured.ingest.v2.interfaces import (
AccessConfig,
ConnectionConfig,
Downloader,
DownloaderConfig,
DownloadResponse,
FileData,
Indexer,
IndexerConfig,
SourceIdentifiers,
download_responses,
)
from unstructured.ingest.v2.logger import logger
from unstructured.ingest.v2.processes.connector_registry import (
SourceRegistryEntry,
add_source_entry,
)
from unstructured.utils import requires_dependencies
CONNECTOR_TYPE = "google_drive"
if TYPE_CHECKING:
from googleapiclient.discovery import Resource as GoogleAPIResource
from googleapiclient.http import MediaIoBaseDownload
@dataclass
class GoogleDriveAccessConfig(AccessConfig):
service_account_key: Union[str, dict]
@dataclass
class GoogleDriveConnectionConfig(ConnectionConfig):
drive_id: str
access_config: GoogleDriveAccessConfig = enhanced_field(sensitive=True)
@requires_dependencies(["googleapiclient"], extras="google-drive")
def get_files_service(self) -> "GoogleAPIResource":
from google.auth import default, exceptions
from google.oauth2 import service_account
from googleapiclient.discovery import build
from googleapiclient.errors import HttpError
# Service account key can be a dict or a file path(str)
# But the dict may come in as a string
if isinstance(self.access_config.service_account_key, str):
key_path = json_to_dict(self.access_config.service_account_key)
elif isinstance(self.access_config.service_account_key, dict):
key_path = self.access_config.service_account_key
else:
raise TypeError(
f"access_config.service_account_key must be "
f"str or dict, got: {type(self.access_config.service_account_key)}"
)
try:
if isinstance(key_path, dict):
creds = service_account.Credentials.from_service_account_info(key_path)
elif isinstance(key_path, str):
os.environ["GOOGLE_APPLICATION_CREDENTIALS"] = key_path
creds, _ = default()
else:
raise ValueError(
f"key path not recognized as a dictionary or a file path: "
f"[{type(key_path)}] {key_path}",
)
service = build("drive", "v3", credentials=creds)
return service.files()
except HttpError as exc:
raise ValueError(f"{exc.reason}")
except exceptions.DefaultCredentialsError:
raise ValueError("The provided API key is invalid.")
@dataclass
class GoogleDriveIndexerConfig(IndexerConfig):
extensions: Optional[list[str]] = None
recursive: bool = False
def __post_init__(self):
# Strip leading period of extension
if self.extensions is not None:
self.extensions = [e[1:] if e.startswith(".") else e for e in self.extensions]
@dataclass
class GoogleDriveIndexer(Indexer):
connection_config: GoogleDriveConnectionConfig
index_config: GoogleDriveIndexerConfig
fields: list[str] = field(
default_factory=lambda: [
"id",
"name",
"mimeType",
"fileExtension",
"md5Checksum",
"sha1Checksum",
"sha256Checksum",
"headRevisionId",
"permissions",
"createdTime",
"modifiedTime",
"version",
"originalFilename",
"capabilities",
"permissionIds",
"webViewLink",
"webContentLink",
]
)
@staticmethod
def is_dir(record: dict) -> bool:
return record.get("mimeType") == "application/vnd.google-apps.folder"
@staticmethod
def map_file_data(f: dict) -> FileData:
file_id = f["id"]
filename = f.pop("name")
url = f.pop("webContentLink")
version = f.pop("version", None)
permissions = f.pop("permissions", None)
date_created_str = f.pop("createdTime")
date_created_dt = parser.parse(date_created_str)
date_modified_str = f.pop("modifiedTime")
parent_path = f.pop("parent_path", None)
parent_root_path = f.pop("parent_root_path", None)
date_modified_dt = parser.parse(date_modified_str)
if (
parent_path
and isinstance(parent_path, str)
and parent_root_path
and isinstance(parent_root_path, str)
):
fullpath = f"{parent_path}/{filename}"
rel_path = fullpath.replace(parent_root_path, "")
source_identifiers = SourceIdentifiers(
filename=filename, fullpath=fullpath, rel_path=rel_path
)
else:
source_identifiers = SourceIdentifiers(fullpath=filename, filename=filename)
return FileData(
connector_type=CONNECTOR_TYPE,
identifier=file_id,
source_identifiers=source_identifiers,
metadata=DataSourceMetadata(
url=url,
version=version,
date_created=str(date_created_dt.timestamp()),
date_modified=str(date_modified_dt.timestamp()),
permissions_data=permissions,
record_locator={
"file_id": file_id,
},
),
additional_metadata=f,
)
def get_paginated_results(
self,
files_client,
object_id: str,
extensions: Optional[list[str]] = None,
recursive: bool = False,
previous_path: Optional[str] = None,
) -> list[dict]:
fields_input = "nextPageToken, files({})".format(",".join(self.fields))
q = f"'{object_id}' in parents"
# Filter by extension but still include any directories
if extensions:
ext_filter = " or ".join([f"fileExtension = '{e}'" for e in extensions])
q = f"{q} and ({ext_filter} or mimeType = 'application/vnd.google-apps.folder')"
logger.info(f"Query used when indexing: {q}")
logger.info("response fields limited to: {}".format(", ".join(self.fields)))
done = False
page_token = None
files_response = []
while not done:
response: dict = files_client.list(
spaces="drive",
fields=fields_input,
corpora="user",
pageToken=page_token,
q=q,
).execute()
if files := response.get("files", []):
fs = [f for f in files if not self.is_dir(record=f)]
for r in fs:
r["parent_path"] = previous_path
dirs = [f for f in files if self.is_dir(record=f)]
files_response.extend(fs)
if recursive:
for d in dirs:
dir_id = d["id"]
dir_name = d["name"]
files_response.extend(
self.get_paginated_results(
files_client=files_client,
object_id=dir_id,
extensions=extensions,
recursive=recursive,
previous_path=f"{previous_path}/{dir_name}",
)
)
page_token = response.get("nextPageToken")
if page_token is None:
done = True
for r in files_response:
r["parent_root_path"] = previous_path
return files_response
def get_root_info(self, files_client, object_id: str) -> dict:
return files_client.get(fileId=object_id, fields=",".join(self.fields)).execute()
def get_files(
self,
files_client,
object_id: str,
recursive: bool = False,
extensions: Optional[list[str]] = None,
) -> list[FileData]:
root_info = self.get_root_info(files_client=files_client, object_id=object_id)
if not self.is_dir(root_info):
data = [self.map_file_data(root_info)]
else:
file_contents = self.get_paginated_results(
files_client=files_client,
object_id=object_id,
extensions=extensions,
recursive=recursive,
previous_path=root_info["name"],
)
data = [self.map_file_data(f=f) for f in file_contents]
for d in data:
d.metadata.record_locator["drive_id"]: object_id
return data
def run(self, **kwargs: Any) -> Generator[FileData, None, None]:
for f in self.get_files(
files_client=self.connection_config.get_files_service(),
object_id=self.connection_config.drive_id,
recursive=self.index_config.recursive,
extensions=self.index_config.extensions,
):
yield f
@dataclass
class GoogleDriveDownloaderConfig(DownloaderConfig):
pass
@dataclass
class GoogleDriveDownloader(Downloader):
connection_config: GoogleDriveConnectionConfig
download_config: GoogleDriveDownloaderConfig = field(
default_factory=lambda: GoogleDriveDownloaderConfig()
)
connector_type: str = CONNECTOR_TYPE
def get_download_path(self, file_data: FileData) -> Path:
rel_path = file_data.source_identifiers.relative_path
rel_path = rel_path[1:] if rel_path.startswith("/") else rel_path
return self.download_dir / Path(rel_path)
@SourceConnectionNetworkError.wrap
def _get_content(self, downloader: "MediaIoBaseDownload") -> bool:
downloaded = False
while downloaded is False:
_, downloaded = downloader.next_chunk()
return downloaded
@staticmethod
def is_float(value: str):
try:
float(value)
return True
except ValueError:
return False
def _write_file(self, file_data: FileData, file_contents: io.BytesIO):
download_path = self.get_download_path(file_data=file_data)
download_path.parent.mkdir(parents=True, exist_ok=True)
logger.info(f"writing {file_data.source_identifiers.fullpath} to {download_path}")
with open(download_path, "wb") as handler:
handler.write(file_contents.getbuffer())
if (
file_data.metadata.date_modified
and self.is_float(file_data.metadata.date_modified)
and file_data.metadata.date_created
and self.is_float(file_data.metadata.date_created)
):
date_modified = float(file_data.metadata.date_modified)
date_created = float(file_data.metadata.date_created)
os.utime(download_path, times=(date_created, date_modified))
return DownloadResponse(file_data=file_data, path=download_path)
@requires_dependencies(["googleapiclient"], extras="google-drive")
def run(self, file_data: FileData, **kwargs: Any) -> download_responses:
from googleapiclient.http import MediaIoBaseDownload
logger.info(f"fetching file: {file_data.source_identifiers.fullpath}")
mime_type = file_data.additional_metadata["mimeType"]
record_id = file_data.identifier
files_client = self.connection_config.get_files_service()
if mime_type.startswith("application/vnd.google-apps"):
export_mime = GOOGLE_DRIVE_EXPORT_TYPES.get(
self.meta.get("mimeType"), # type: ignore
)
if not export_mime:
raise TypeError(
f"File not supported. Name: {file_data.source_identifiers.filename} "
f"ID: {record_id} "
f"MimeType: {mime_type}"
)
request = files_client.export_media(
fileId=record_id,
mimeType=export_mime,
)
else:
request = files_client.get_media(fileId=record_id)
file_contents = io.BytesIO()
downloader = MediaIoBaseDownload(file_contents, request)
downloaded = self._get_content(downloader=downloader)
if not downloaded or not file_contents:
return []
return self._write_file(file_data=file_data, file_contents=file_contents)
add_source_entry(
source_type=CONNECTOR_TYPE,
entry=SourceRegistryEntry(
connection_config=GoogleDriveConnectionConfig,
indexer_config=GoogleDriveIndexerConfig,
indexer=GoogleDriveIndexer,
downloader_config=GoogleDriveDownloaderConfig,
downloader=GoogleDriveDownloader,
),
)

View File

@ -173,7 +173,12 @@ class LocalUploader(Uploader):
for content in contents: for content in contents:
if source_identifiers := content.file_data.source_identifiers: if source_identifiers := content.file_data.source_identifiers:
identifiers = source_identifiers identifiers = source_identifiers
new_path = self.upload_config.output_path / identifiers.relative_path rel_path = (
identifiers.relative_path[1:]
if identifiers.relative_path.startswith("/")
else identifiers.relative_path
)
new_path = self.upload_config.output_path / Path(rel_path)
final_path = str(new_path).replace( final_path = str(new_path).replace(
identifiers.filename, f"{identifiers.filename}.json" identifiers.filename, f"{identifiers.filename}.json"
) )