mirror of
https://github.com/Unstructured-IO/unstructured.git
synced 2025-07-04 07:27:34 +00:00

### Description Building off of PR https://github.com/Unstructured-IO/unstructured/pull/2179, updating fsspec based connectors to use better authentication field handling. This PR adds in the following changes: * Update the base classes to inherit from the enhanced json mixin * Add in a new access config dataclass that should be used as a nest dataclass in the connector configs * Update the code extracting configs out of the cli options dictionary to support the nested access config if it exists on the parent config * Update all fsspec connectors with explicit access configs given what each one's SDKs support * Update the json mixin and enhanced field to support a name override when serializing/deserializing from json/dicts. This allows a different name to be used for the CLI option than what the name of the field is on the dataclass. * Update all the writes to use class-based approach and share the same structure of the runner classes * Above update allowed for better code to be used in the base source and destination CLI commands * Add in utility code around paring a flat dictionary (coming from the click based options) into dataclass-based configs with potentially nested dataclasses. **Slightly unrelated changes:** * session handle removed from pinecone connector as this was breaking the serialization of the write config and didn't have any benefit as a connection was never being shared, the index used simply makes a new http call each time it's invoked. * Dedicated write configs were created for all destination connectors to better support serialization * Refactor of Elasticsearch connector included, with update to ingest test to use auth **TODOs** * Left a `#TODO` in the code but the way session handler is implemented right now, it breaks serialization since it adds a generic variable based on the library being used for a connector (i.e. `googleapiclient.discovery.Resource`) which is not serializable. This will need to be updated to omit that from serialization but still support the current workflow. --------- Co-authored-by: ryannikolaidis <1208590+ryannikolaidis@users.noreply.github.com> Co-authored-by: rbiseck3 <rbiseck3@users.noreply.github.com>
34 lines
1.0 KiB
Python
Executable File
34 lines
1.0 KiB
Python
Executable File
#!/usr/bin/env python3
|
|
|
|
import pandas as pd
|
|
from elasticsearch import Elasticsearch
|
|
from elasticsearch.helpers import bulk
|
|
from es_cluster_config import (
|
|
CLUSTER_URL,
|
|
DATA_PATH,
|
|
INDEX_NAME,
|
|
MAPPINGS,
|
|
form_elasticsearch_doc_dict,
|
|
)
|
|
|
|
print("Connecting to the Elasticsearch cluster.")
|
|
es = Elasticsearch(CLUSTER_URL, basic_auth=("elastic", "DkIedPPSCb"), request_timeout=30)
|
|
print(es.info())
|
|
df = pd.read_csv(DATA_PATH).dropna().reset_index()
|
|
|
|
print("Creating an Elasticsearch index for testing elasticsearch ingest.")
|
|
response = es.options(max_retries=5).indices.create(index=INDEX_NAME, mappings=MAPPINGS)
|
|
if response.meta.status != 200:
|
|
raise RuntimeError("failed to create index")
|
|
|
|
print("Loading data into the index.")
|
|
bulk_data = []
|
|
for i, row in df.iterrows():
|
|
bulk_data.append(form_elasticsearch_doc_dict(i, row))
|
|
bulk(es, bulk_data)
|
|
|
|
es.indices.refresh(index=INDEX_NAME)
|
|
response = es.cat.count(index=INDEX_NAME, format="json")
|
|
|
|
print("Successfully created and filled an Elasticsearch index for testing elasticsearch ingest.")
|