bug: fix check_connection for (#2497)

fixes check_connection for:
azure
opensearch
postgres

For Azure, the check_connection in fsspec.py actually worked better. 

Adding check_connection for Databricks Volumes

---------

Co-authored-by: potter-potter <david.potter@gmail.com>
This commit is contained in:
David Potter 2024-02-09 06:33:12 -08:00 committed by GitHub
parent dd6576c603
commit d11c70cf83
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
7 changed files with 24 additions and 23 deletions

View File

@ -1,4 +1,4 @@
## 0.12.5-dev0 ## 0.12.5-dev1
### Enhancements ### Enhancements
@ -6,6 +6,8 @@
### Fixes ### Fixes
* **Fix `check_connection` in opensearch, databricks, postgres, azure connectors **
## 0.12.4 ## 0.12.4
### Enhancements ### Enhancements

View File

@ -1 +1 @@
__version__ = "0.12.5-dev0" # pragma: no cover __version__ = "0.12.5-dev1" # pragma: no cover

View File

@ -8,6 +8,7 @@ from pathlib import PurePath
from unstructured.ingest.enhanced_dataclass import enhanced_field from unstructured.ingest.enhanced_dataclass import enhanced_field
from unstructured.ingest.enhanced_dataclass.core import _asdict from unstructured.ingest.enhanced_dataclass.core import _asdict
from unstructured.ingest.error import DestinationConnectionError
from unstructured.ingest.interfaces import ( from unstructured.ingest.interfaces import (
AccessConfig, AccessConfig,
BaseConnectorConfig, BaseConnectorConfig,
@ -77,7 +78,7 @@ class DatabricksVolumesDestinationConnector(BaseDestinationConnector):
setattr(self_cp, "_client", None) setattr(self_cp, "_client", None)
return _asdict(self_cp, **kwargs) return _asdict(self_cp, **kwargs)
@requires_dependencies(dependencies=["databricks.sdk"], extras="databricks") @requires_dependencies(dependencies=["databricks.sdk"], extras="databricks-volumes")
def generate_client(self) -> "WorkspaceClient": def generate_client(self) -> "WorkspaceClient":
from databricks.sdk import WorkspaceClient from databricks.sdk import WorkspaceClient
@ -92,7 +93,11 @@ class DatabricksVolumesDestinationConnector(BaseDestinationConnector):
return self._client return self._client
def check_connection(self): def check_connection(self):
pass try:
assert self.client.current_user.me().active
except Exception as e:
logger.error(f"failed to validate connection: {e}", exc_info=True)
raise DestinationConnectionError(f"failed to validate connection: {e}")
def initialize(self): def initialize(self):
_ = self.client _ = self.client

View File

@ -54,16 +54,6 @@ class AzureBlobStorageIngestDoc(FsspecIngestDoc):
class AzureBlobStorageSourceConnector(FsspecSourceConnector): class AzureBlobStorageSourceConnector(FsspecSourceConnector):
connector_config: SimpleAzureBlobStorageConfig connector_config: SimpleAzureBlobStorageConfig
@requires_dependencies(["adlfs"], extras="azure")
def check_connection(self):
from adlfs import AzureBlobFileSystem
try:
AzureBlobFileSystem(**self.connector_config.get_access_config())
except ValueError as connection_error:
logger.error(f"failed to validate connection: {connection_error}", exc_info=True)
raise SourceConnectionError(f"failed to validate connection: {connection_error}")
def __post_init__(self): def __post_init__(self):
self.ingest_doc_cls: t.Type[AzureBlobStorageIngestDoc] = AzureBlobStorageIngestDoc self.ingest_doc_cls: t.Type[AzureBlobStorageIngestDoc] = AzureBlobStorageIngestDoc

View File

@ -170,7 +170,7 @@ class FsspecSourceConnector(
fs = get_filesystem_class(self.connector_config.protocol)( fs = get_filesystem_class(self.connector_config.protocol)(
**self.connector_config.get_access_config(), **self.connector_config.get_access_config(),
) )
fs.ls(path=self.connector_config.path_without_protocol) fs.ls(path=self.connector_config.path_without_protocol, detail=False)
except Exception as e: except Exception as e:
logger.error(f"failed to validate connection: {e}", exc_info=True) logger.error(f"failed to validate connection: {e}", exc_info=True)
raise SourceConnectionError(f"failed to validate connection: {e}") raise SourceConnectionError(f"failed to validate connection: {e}")
@ -186,7 +186,7 @@ class FsspecSourceConnector(
) )
"""Verify that can get metadata for an object, validates connections info.""" """Verify that can get metadata for an object, validates connections info."""
ls_output = self.fs.ls(self.connector_config.path_without_protocol) ls_output = self.fs.ls(self.connector_config.path_without_protocol, detail=False)
if len(ls_output) < 1: if len(ls_output) < 1:
raise ValueError( raise ValueError(
f"No objects found in {self.connector_config.remote_url}.", f"No objects found in {self.connector_config.remote_url}.",
@ -308,7 +308,7 @@ class FsspecDestinationConnector(BaseDestinationConnector):
bucket_name += self.connector_config.dir_path.split("/")[0] bucket_name += self.connector_config.dir_path.split("/")[0]
logger.info(f"checking connection for destination {bucket_name}") logger.info(f"checking connection for destination {bucket_name}")
fs.ls(path=bucket_name) fs.ls(path=bucket_name, detail=False)
except Exception as e: except Exception as e:
logger.error(f"failed to validate connection: {e}", exc_info=True) logger.error(f"failed to validate connection: {e}", exc_info=True)
raise DestinationConnectionError(f"failed to validate connection: {e}") raise DestinationConnectionError(f"failed to validate connection: {e}")

View File

@ -140,7 +140,7 @@ class OpenSearchSourceConnector(ElasticsearchSourceConnector):
def check_connection(self): def check_connection(self):
try: try:
self.ops.ping() assert self.ops.ping()
except Exception as e: except Exception as e:
logger.error(f"failed to validate connection: {e}", exc_info=True) logger.error(f"failed to validate connection: {e}", exc_info=True)
raise SourceConnectionError(f"failed to validate connection: {e}") raise SourceConnectionError(f"failed to validate connection: {e}")

View File

@ -52,7 +52,7 @@ class SimpleSqlConfig(BaseConnectorConfig):
return connect(database=self.database) return connect(database=self.database)
@requires_dependencies(["psycopg2"], extras="postgresql") @requires_dependencies(["psycopg2"], extras="postgres")
def _make_psycopg_connection(self): def _make_psycopg_connection(self):
from psycopg2 import connect from psycopg2 import connect
@ -93,9 +93,13 @@ class SqlDestinationConnector(BaseDestinationConnector):
_ = self.client _ = self.client
def check_connection(self): def check_connection(self):
try:
cursor = self.client.cursor() cursor = self.client.cursor()
cursor.execute("SELECT 1;") cursor.execute("SELECT 1;")
cursor.close() cursor.close()
except Exception as e:
logger.error(f"failed to validate connection: {e}", exc_info=True)
raise DestinationConnectionError(f"failed to validate connection: {e}")
def conform_dict(self, data: dict) -> None: def conform_dict(self, data: dict) -> None:
""" """