support passing credentials from memory for google connectors (#1888)

### Description

### Google Drive
The existing service account parameter was expanded to support either a
file path or a json value to generate the credentials when instantiating
the google drive client.

### GCS
Google Cloud Storage already supports the value being passed in, from
their docstring:
> - you may supply a token generated by the
      [gcloud](https://cloud.google.com/sdk/docs/)
      utility; this is either a python dictionary, the name of a file
containing the JSON returned by logging in with the gcloud CLI tool,
      or a Credentials object.


I tested this locally:

```python
from gcsfs import GCSFileSystem
import json

with open("/Users/romanisecke/.ssh/google-cloud-unstructured-ingest-test-d4fc30286d9d.json") as json_file:
    json_data = json.load(json_file)
    print(json_data)

    fs = GCSFileSystem(token=json_data)
    print(fs.ls(path="gs://utic-test-ingest-fixtures/"))
```
`['utic-test-ingest-fixtures/ideas-page.html',
'utic-test-ingest-fixtures/nested-1',
'utic-test-ingest-fixtures/nested-2']`
This commit is contained in:
Roman Isecke 2023-10-31 13:12:04 -04:00 committed by GitHub
parent 922bc84cee
commit 123ad20f4c
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
11 changed files with 68 additions and 29 deletions

View File

@ -1,9 +1,10 @@
## 0.10.29-dev2 ## 0.10.29-dev3
### Enhancements ### Enhancements
* **Add include_header argument for partition_csv and partition_tsv** Now supports retaining header rows in CSV and TSV documents element partitioning. * **Add include_header argument for partition_csv and partition_tsv** Now supports retaining header rows in CSV and TSV documents element partitioning.
* **Add retry logic for all source connectors** All http calls being made by the ingest source connectors have been isolated and wrapped by the `SourceConnectionNetworkError` custom error, which triggers the retry logic, if enabled, in the ingest pipeline. * **Add retry logic for all source connectors** All http calls being made by the ingest source connectors have been isolated and wrapped by the `SourceConnectionNetworkError` custom error, which triggers the retry logic, if enabled, in the ingest pipeline.
* **Google Drive source connector supports credentials from memory** Originally, the connector expected a filepath to pull the credentials from when creating the client. This was expanded to support passing that information from memory as a dict if access to the file system might not be available.
### Features ### Features

View File

@ -49,7 +49,7 @@ PYTHONPATH=. ./unstructured/ingest/main.py \
--input-path example-docs/fake-memo.pdf \ --input-path example-docs/fake-memo.pdf \
--work-dir "$WORK_DIR" \ --work-dir "$WORK_DIR" \
gcs \ gcs \
--token "$GCP_INGEST_SERVICE_KEY_FILE" \ --service-account-key "$GCP_INGEST_SERVICE_KEY_FILE" \
--remote-url "$DESTINATION_GCS" --remote-url "$DESTINATION_GCS"
# Simply check the number of files uploaded # Simply check the number of files uploaded

View File

@ -40,7 +40,7 @@ PYTHONPATH=. ./unstructured/ingest/main.py \
--reprocess \ --reprocess \
--output-dir "$OUTPUT_DIR" \ --output-dir "$OUTPUT_DIR" \
--verbose \ --verbose \
--token "$GCP_INGEST_SERVICE_KEY_FILE" \ --service-account-key "$GCP_INGEST_SERVICE_KEY_FILE" \
--recursive \ --recursive \
--remote-url gs://utic-test-ingest-fixtures/ \ --remote-url gs://utic-test-ingest-fixtures/ \
--work-dir "$WORK_DIR" --work-dir "$WORK_DIR"

View File

@ -1 +1 @@
__version__ = "0.10.29-dev2" # pragma: no cover __version__ = "0.10.29-dev3" # pragma: no cover

View File

@ -4,9 +4,7 @@ from dataclasses import dataclass
import click import click
from unstructured.ingest.cli.base.src import BaseSrcCmd from unstructured.ingest.cli.base.src import BaseSrcCmd
from unstructured.ingest.cli.interfaces import ( from unstructured.ingest.cli.interfaces import CliMixin, FileOrJson
CliMixin,
)
from unstructured.ingest.interfaces import BaseConfig from unstructured.ingest.interfaces import BaseConfig
CMD_NAME = "gcs" CMD_NAME = "gcs"
@ -14,17 +12,17 @@ CMD_NAME = "gcs"
@dataclass @dataclass
class GcsCliConfig(BaseConfig, CliMixin): class GcsCliConfig(BaseConfig, CliMixin):
token: t.Optional[str] = None service_account_key: t.Optional[t.Union[dict, str]] = None
@staticmethod @staticmethod
def get_cli_options() -> t.List[click.Option]: def get_cli_options() -> t.List[click.Option]:
options = [ options = [
click.Option( click.Option(
["--token"], ["--service-account-key"],
default=None, default=None,
help="Token used to access Google Cloud. GCSFS will attempt to use your " type=FileOrJson(),
"default gcloud creds or get creds from the google metadata service " help="Either the file path of the credentials file to use or a json string of "
"or fall back to anonymous access.", "those values to use for authentication",
), ),
] ]
return options return options

View File

@ -4,17 +4,14 @@ from dataclasses import dataclass
import click import click
from unstructured.ingest.cli.base.src import BaseSrcCmd from unstructured.ingest.cli.base.src import BaseSrcCmd
from unstructured.ingest.cli.interfaces import ( from unstructured.ingest.cli.interfaces import CliMixin, CliRecursiveConfig, FileOrJson
CliMixin,
CliRecursiveConfig,
)
from unstructured.ingest.interfaces import BaseConfig from unstructured.ingest.interfaces import BaseConfig
@dataclass @dataclass
class GoogleDriveCliConfig(BaseConfig, CliMixin): class GoogleDriveCliConfig(BaseConfig, CliMixin):
drive_id: str drive_id: str
service_account_key: str service_account_key: t.Union[dict, str]
extension: t.Optional[str] = None extension: t.Optional[str] = None
@staticmethod @staticmethod
@ -29,8 +26,9 @@ class GoogleDriveCliConfig(BaseConfig, CliMixin):
click.Option( click.Option(
["--service-account-key"], ["--service-account-key"],
required=True, required=True,
type=str, type=FileOrJson(),
help="Path to the Google Drive service account json file.", help="Either the file path of the credentials file to use or a json string of "
"those values to use for authentication",
), ),
click.Option( click.Option(
["--extension"], ["--extension"],

View File

@ -1,7 +1,9 @@
import json
import os.path
import typing as t import typing as t
from abc import abstractmethod from abc import abstractmethod
from dataclasses import fields from dataclasses import fields
from gettext import ngettext from gettext import gettext, ngettext
from pathlib import Path from pathlib import Path
import click import click
@ -20,6 +22,33 @@ from unstructured.ingest.interfaces import (
) )
class FileOrJson(click.ParamType):
name = "file-or-json"
def convert(
self,
value: t.Any,
param: t.Optional[click.Parameter],
ctx: t.Optional[click.Context],
) -> t.Any:
# check if valid file
full_path = os.path.abspath(os.path.expanduser(value))
if os.path.isfile(full_path):
return str(Path(full_path).resolve())
if isinstance(value, str):
try:
return json.loads(value)
except json.JSONDecodeError:
pass
self.fail(
gettext(
"{value} is not a valid json string nor an existing filepath.",
).format(value=value),
param,
ctx,
)
class DelimitedString(click.ParamType): class DelimitedString(click.ParamType):
name = "delimited-string" name = "delimited-string"

View File

@ -38,7 +38,7 @@ class GoogleDriveSessionHandle(BaseSessionHandle):
@requires_dependencies(["googleapiclient"], extras="google-drive") @requires_dependencies(["googleapiclient"], extras="google-drive")
def create_service_account_object(key_path, id=None): def create_service_account_object(key_path: t.Union[str, dict], id=None):
""" """
Creates a service object for interacting with Google Drive. Creates a service object for interacting with Google Drive.
@ -53,12 +53,21 @@ def create_service_account_object(key_path, id=None):
Service account object Service account object
""" """
from google.auth import default, exceptions from google.auth import default, exceptions
from google.oauth2 import service_account
from googleapiclient.discovery import build from googleapiclient.discovery import build
from googleapiclient.errors import HttpError from googleapiclient.errors import HttpError
try: try:
os.environ["GOOGLE_APPLICATION_CREDENTIALS"] = key_path if isinstance(key_path, dict):
creds, _ = default() creds = service_account.Credentials.from_service_account_info(key_path)
elif isinstance(key_path, str):
os.environ["GOOGLE_APPLICATION_CREDENTIALS"] = key_path
creds, _ = default()
else:
raise ValueError(
f"key path not recognized as a dictionary or a file path: "
f"[{type(key_path)}] {key_path}",
)
service = build("drive", "v3", credentials=creds) service = build("drive", "v3", credentials=creds)
if id: if id:
@ -85,7 +94,7 @@ class SimpleGoogleDriveConfig(ConfigSessionHandleMixin, BaseConnectorConfig):
# Google Drive Specific Options # Google Drive Specific Options
drive_id: str drive_id: str
service_account_key: str service_account_key: t.Union[str, dict]
extension: t.Optional[str] = None extension: t.Optional[str] = None
recursive: bool = False recursive: bool = False

View File

@ -9,7 +9,7 @@ from unstructured.ingest.runner.utils import update_download_dir_remote_url
class GCSRunner(FsspecBaseRunner): class GCSRunner(FsspecBaseRunner):
def run( def run(
self, self,
token: t.Optional[str] = None, service_account_key: t.Optional[t.Union[dict, str]] = None,
**kwargs, **kwargs,
): ):
ingest_log_streaming_init(logging.DEBUG if self.processor_config.verbose else logging.INFO) ingest_log_streaming_init(logging.DEBUG if self.processor_config.verbose else logging.INFO)
@ -24,7 +24,11 @@ class GCSRunner(FsspecBaseRunner):
from unstructured.ingest.connector.gcs import GcsSourceConnector, SimpleGcsConfig from unstructured.ingest.connector.gcs import GcsSourceConnector, SimpleGcsConfig
connector_config = SimpleGcsConfig.from_dict(self.fsspec_config.to_dict()) # type: ignore connector_config = SimpleGcsConfig.from_dict(self.fsspec_config.to_dict()) # type: ignore
connector_config.access_kwargs = {"token": token} access_kwargs = {}
if service_account_key:
access_kwargs["token"] = service_account_key
connector_config.access_kwargs = access_kwargs
source_doc_connector = GcsSourceConnector( # type: ignore source_doc_connector = GcsSourceConnector( # type: ignore
connector_config=connector_config, connector_config=connector_config,

View File

@ -10,7 +10,7 @@ from unstructured.ingest.runner.utils import update_download_dir_hash
class GoogleDriveRunner(Runner): class GoogleDriveRunner(Runner):
def run( def run(
self, self,
service_account_key: str, service_account_key: t.Union[str, dict],
drive_id: str, drive_id: str,
recursive: bool = False, recursive: bool = False,
extension: t.Optional[str] = None, extension: t.Optional[str] = None,

View File

@ -5,7 +5,7 @@ from unstructured.ingest.interfaces import BaseDestinationConnector
def gcs_writer( def gcs_writer(
remote_url: str, remote_url: str,
token: t.Optional[str], service_account_key: t.Optional[str],
verbose: bool = False, verbose: bool = False,
**kwargs, **kwargs,
) -> BaseDestinationConnector: ) -> BaseDestinationConnector:
@ -19,6 +19,6 @@ def gcs_writer(
write_config=FsspecWriteConfig(), write_config=FsspecWriteConfig(),
connector_config=SimpleGcsConfig( connector_config=SimpleGcsConfig(
remote_url=remote_url, remote_url=remote_url,
access_kwargs={"token": token}, access_kwargs={"token": service_account_key},
), ),
) )