fix: elasticsearch serialization issue (#2399)

This fixes the serialization of the Elasticsearch destination connector.
Presence of the _client object breaks serialization due to TypeError:
cannot pickle '_thread.lock' object. This removes that object before
serialization.
This commit is contained in:
ryannikolaidis 2024-01-14 15:07:37 -08:00 committed by GitHub
parent f07fc6e03a
commit d7980b3665
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 15 additions and 0 deletions

View File

@ -14,6 +14,7 @@
* **Fix GCS connector converting JSON to string with single quotes.** FSSpec serialization caused conversion of JSON token to string with single quotes. GCS requires token in form of dict so this format is now assured. * **Fix GCS connector converting JSON to string with single quotes.** FSSpec serialization caused conversion of JSON token to string with single quotes. GCS requires token in form of dict so this format is now assured.
* **Fix the serialization of the Pinecone destination connector.** Presence of the PineconeIndex object breaks serialization due to TypeError: cannot pickle '_thread.lock' object. This removes that object before serialization. * **Fix the serialization of the Pinecone destination connector.** Presence of the PineconeIndex object breaks serialization due to TypeError: cannot pickle '_thread.lock' object. This removes that object before serialization.
* **Fix the serialization of the Elasticsearch destination connector.** Presence of the _client object breaks serialization due to TypeError: cannot pickle '_thread.lock' object. This removes that object before serialization.
## 0.12.0 ## 0.12.0

View File

@ -1,3 +1,4 @@
import copy
import hashlib import hashlib
import typing as t import typing as t
import uuid import uuid
@ -7,6 +8,7 @@ from pathlib import Path
from dataclasses_json.core import Json from dataclasses_json.core import Json
from unstructured.ingest.enhanced_dataclass import enhanced_field from unstructured.ingest.enhanced_dataclass import enhanced_field
from unstructured.ingest.enhanced_dataclass.core import _asdict
from unstructured.ingest.error import DestinationConnectionError, SourceConnectionError from unstructured.ingest.error import DestinationConnectionError, SourceConnectionError
from unstructured.ingest.interfaces import ( from unstructured.ingest.interfaces import (
AccessConfig, AccessConfig,
@ -318,6 +320,18 @@ class ElasticsearchDestinationConnector(BaseDestinationConnector):
connector_config: SimpleElasticsearchConfig connector_config: SimpleElasticsearchConfig
_client: t.Optional["Elasticsearch"] = field(init=False, default=None) _client: t.Optional["Elasticsearch"] = field(init=False, default=None)
def to_dict(self, **kwargs):
"""
The _client variable in this dataclass breaks deepcopy due to:
TypeError: cannot pickle '_thread.lock' object
When serializing, remove it, meaning client data will need to be reinitialized
when deserialized
"""
self_cp = copy.copy(self)
if hasattr(self_cp, "_client"):
setattr(self_cp, "_client", None)
return _asdict(self_cp, **kwargs)
@DestinationConnectionError.wrap @DestinationConnectionError.wrap
@requires_dependencies(["elasticsearch"], extras="elasticsearch") @requires_dependencies(["elasticsearch"], extras="elasticsearch")
def generate_client(self) -> "Elasticsearch": def generate_client(self) -> "Elasticsearch":