mirror of
				https://github.com/Unstructured-IO/unstructured.git
				synced 2025-10-25 06:51:34 +00:00 
			
		
		
		
	 fd293b3e78
			
		
	
	
		fd293b3e78
		
			
		
	
	
	
	
		
			
			Closes https://github.com/Unstructured-IO/unstructured/issues/1842
Closes https://github.com/Unstructured-IO/unstructured/issues/2202
Closes https://github.com/Unstructured-IO/unstructured/issues/2203
This PR:
- Adds Elasticsearch destination connector to be able to ingest
documents from any supported source, embed them and write the embeddings
/ documents into Elasticsearch.
- Defines an example unstructured elements schema for users to be able
to setup their unstructured elasticsearch indexes easily.
- Includes parallelized upload and lazy processing for elasticsearch
destination connector.
- Rearranges elasticsearch test helpers to source, destination, and
common folders.
- Adds util functions to be able to batch iterables in a lazy way for
uploads
- Fixes a bug where removing the optional parameter `--fields` broke the
connector due to an integer processing error.
- Fixes a bug where using an [elasticsearch
config](8fa5cbf036/unstructured/ingest/connector/elasticsearch.py (L26-L35))
for a destination connector resulted in a serialization issue when
optional parameter `--fields` was not provided.
		
	
			
		
			
				
	
	
		
			61 lines
		
	
	
		
			1.8 KiB
		
	
	
	
		
			Python
		
	
	
	
	
	
			
		
		
	
	
			61 lines
		
	
	
		
			1.8 KiB
		
	
	
	
		
			Python
		
	
	
	
	
	
| import os
 | |
| 
 | |
| from unstructured.ingest.connector.elasticsearch import (
 | |
|     ElasticsearchAccessConfig,
 | |
|     ElasticsearchWriteConfig,
 | |
|     SimpleElasticsearchConfig,
 | |
| )
 | |
| from unstructured.ingest.connector.local import SimpleLocalConfig
 | |
| from unstructured.ingest.interfaces import (
 | |
|     ChunkingConfig,
 | |
|     EmbeddingConfig,
 | |
|     PartitionConfig,
 | |
|     ProcessorConfig,
 | |
|     ReadConfig,
 | |
| )
 | |
| from unstructured.ingest.runner import LocalRunner
 | |
| from unstructured.ingest.runner.writers.base_writer import Writer
 | |
| from unstructured.ingest.runner.writers.elasticsearch import (
 | |
|     ElasticsearchWriter,
 | |
| )
 | |
| 
 | |
| 
 | |
| def get_writer() -> Writer:
 | |
|     return ElasticsearchWriter(
 | |
|         connector_config=SimpleElasticsearchConfig(
 | |
|             access_config=ElasticsearchAccessConfig(
 | |
|                 hosts=os.getenv("ELASTICSEARCH_HOSTS"),
 | |
|                 username=os.getenv("ELASTICSEARCH_USERNAME"),
 | |
|                 password=os.getenv("ELASTICSEARCH_PASSWORD"),
 | |
|             ),
 | |
|             index_name=os.getenv("ELASTICSEARCH_INDEX_NAME"),
 | |
|         ),
 | |
|         write_config=ElasticsearchWriteConfig(
 | |
|             batch_size_bytes=15_000_000,
 | |
|             num_processes=2,
 | |
|         ),
 | |
|     )
 | |
| 
 | |
| 
 | |
| if __name__ == "__main__":
 | |
|     writer = get_writer()
 | |
|     runner = LocalRunner(
 | |
|         processor_config=ProcessorConfig(
 | |
|             verbose=True,
 | |
|             output_dir="local-output-to-elasticsearch",
 | |
|             num_processes=2,
 | |
|         ),
 | |
|         connector_config=SimpleLocalConfig(
 | |
|             input_path="example-docs/book-war-and-peace-1225p.txt",
 | |
|         ),
 | |
|         read_config=ReadConfig(),
 | |
|         partition_config=PartitionConfig(),
 | |
|         chunking_config=ChunkingConfig(chunk_elements=True),
 | |
|         embedding_config=EmbeddingConfig(
 | |
|             provider="langchain-huggingface",
 | |
|         ),
 | |
|         writer=writer,
 | |
|         writer_kwargs={},
 | |
|     )
 | |
|     runner.run()
 |