diff --git a/CHANGELOG.md b/CHANGELOG.md index ffe31b3a5..ef428a17b 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -14,6 +14,7 @@ * **Fix GCS connector converting JSON to string with single quotes.** FSSpec serialization caused conversion of JSON token to string with single quotes. GCS requires token in form of dict so this format is now assured. * **Fix the serialization of the Pinecone destination connector.** Presence of the PineconeIndex object breaks serialization due to TypeError: cannot pickle '_thread.lock' object. This removes that object before serialization. +* **Fix the serialization of the Elasticsearch destination connector.** Presence of the _client object breaks serialization due to TypeError: cannot pickle '_thread.lock' object. This removes that object before serialization. ## 0.12.0 diff --git a/unstructured/ingest/connector/elasticsearch.py b/unstructured/ingest/connector/elasticsearch.py index 4e0b77073..a3ed55ead 100644 --- a/unstructured/ingest/connector/elasticsearch.py +++ b/unstructured/ingest/connector/elasticsearch.py @@ -1,3 +1,4 @@ +import copy import hashlib import typing as t import uuid @@ -7,6 +8,7 @@ from pathlib import Path from dataclasses_json.core import Json from unstructured.ingest.enhanced_dataclass import enhanced_field +from unstructured.ingest.enhanced_dataclass.core import _asdict from unstructured.ingest.error import DestinationConnectionError, SourceConnectionError from unstructured.ingest.interfaces import ( AccessConfig, @@ -318,6 +320,18 @@ class ElasticsearchDestinationConnector(BaseDestinationConnector): connector_config: SimpleElasticsearchConfig _client: t.Optional["Elasticsearch"] = field(init=False, default=None) + def to_dict(self, **kwargs): + """ + The _client variable in this dataclass breaks deepcopy due to: + TypeError: cannot pickle '_thread.lock' object + When serializing, remove it, meaning client data will need to be reinitialized + when deserialized + """ + self_cp = copy.copy(self) + if hasattr(self_cp, "_client"): + setattr(self_cp, "_client", None) + return _asdict(self_cp, **kwargs) + @DestinationConnectionError.wrap @requires_dependencies(["elasticsearch"], extras="elasticsearch") def generate_client(self) -> "Elasticsearch":