feat: adds Azure Cognitive Search (full text) destination connector (#1459)

### Description
New [Azure Cognitive
Search](https://azure.microsoft.com/en-us/products/ai-services/cognitive-search)
destination connector added. Writes each json element from the created
json files via partition and writes that content to an index.

**Bonus bug fix:** Due to a recent change where the default version of
python used in the repo was bumped to `3.10` from `3.8`, this means
running `pip-compile` now runs it against that version rather than the
lowest we support which is still `3.8`. This breaks the setup for those
lower versions because some of the versions pulled in by `pip-compile`
exist for `3.10` but not `3.8`. `pip-compile` was updates to run as a
script that checks the version of python being used first, which helps
guarantee that all dependencies meet the minimum python version
requirement.

Closes out https://github.com/Unstructured-IO/unstructured/issues/1466
This commit is contained in:
Roman Isecke 2023-09-25 10:27:42 -04:00 committed by GitHub
parent 5d193c8e5a
commit bd49cfbab7
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
25 changed files with 899 additions and 31 deletions

View File

@ -289,6 +289,8 @@ jobs:
NOTION_API_KEY: ${{ secrets.NOTION_API_KEY }}
AWS_SECRET_ACCESS_KEY: ${{ secrets.AWS_SECRET_ACCESS_KEY }}
AWS_ACCESS_KEY_ID: ${{ secrets.AWS_ACCESS_KEY_ID }}
AZURE_SEARCH_ENDPOINT: ${{ secrets.AZURE_SEARCH_ENDPOINT }}
AZURE_SEARCH_API_KEY: ${{ secrets.AZURE_SEARCH_API_KEY }}
TABLE_OCR: "tesseract"
ENTIRE_PAGE_OCR: "tesseract"
run: |

View File

@ -86,6 +86,8 @@ jobs:
NOTION_API_KEY: ${{ secrets.NOTION_API_KEY }}
AWS_SECRET_ACCESS_KEY: ${{ secrets.AWS_SECRET_ACCESS_KEY }}
AWS_ACCESS_KEY_ID: ${{ secrets.AWS_ACCESS_KEY_ID }}
AZURE_SEARCH_ENDPOINT: ${{ secrets.AZURE_SEARCH_ENDPOINT }}
AZURE_SEARCH_API_KEY: ${{ secrets.AZURE_SEARCH_API_KEY }}
TABLE_OCR: "tesseract"
ENTIRE_PAGE_OCR: "tesseract"
OVERWRITE_FIXTURES: "true"

View File

@ -4,6 +4,7 @@
* **Adds data source properties to SharePoint, Outlook, Onedrive, Reddit, and Slack connectors** These properties (date_created, date_modified, version, source_url, record_locator) are written to element metadata during ingest, mapping elements to information about the document source from which they derive. This functionality enables downstream applications to reveal source document applications, e.g. a link to a GDrive doc, Salesforce record, etc.
* **Add functionality to save embedded images in PDF's separately as images** This allows users to save embedded images in PDF's separately as images, given some directory path. The saved image path is written to the metadata for the Image element. Downstream applications may benefit by providing users with image links from relevant "hits."
* **Azure Cognite Search destination connector** New Azure Cognitive Search destination connector added to ingest CLI. Users may now use `unstructured-ingest` to write partitioned data from over 20 data sources (so far) to an Azure Cognitive Search index.
### Features

View File

@ -221,14 +221,7 @@ install-paddleocr:
## pip-compile: compiles all base/dev/test requirements
.PHONY: pip-compile
pip-compile:
@for file in $(shell ls requirements/*.in); do \
if [[ "$${file}" =~ "constraints" ]]; then \
continue; \
fi; \
echo "running: pip-compile --upgrade $${file}"; \
pip-compile --upgrade $${file}; \
done
cp requirements/build.txt docs/requirements.txt
@scripts/pip-compile.sh

View File

@ -9,3 +9,4 @@ in our community `Slack. <https://join.slack.com/t/unstructuredw-kbe4326/shared_
:maxdepth: 1
destination_connectors/delta_table
destination_connectors/azure_cognitive_search

View File

@ -0,0 +1,166 @@
{
"@odata.context": "https://utic-test-ingest-fixtures.search.windows.net/$metadata#indexes/$entity",
"@odata.etag": "\"0x8DBB93E09C8F4BD\"",
"name": "your-index-here",
"fields": [
{
"name": "id",
"type": "Edm.String",
"key": true
},
{
"name": "element_id",
"type": "Edm.String"
},
{
"name": "text",
"type": "Edm.String"
},
{
"name": "type",
"type": "Edm.String"
},
{
"name": "metadata",
"type": "Edm.ComplexType",
"fields": [
{
"name": "category_depth",
"type": "Edm.Int32"
},
{
"name": "parent_id",
"type": "Edm.String"
},
{
"name": "attached_to_filename",
"type": "Edm.String"
},
{
"name": "filetype",
"type": "Edm.String"
},
{
"name": "last_modified",
"type": "Edm.DateTimeOffset"
},
{
"name": "file_directory",
"type": "Edm.String"
},
{
"name": "filename",
"type": "Edm.String"
},
{
"name": "data_source",
"type": "Edm.ComplexType",
"fields": [
{
"name": "url",
"type": "Edm.String"
},
{
"name": "version",
"type": "Edm.String"
},
{
"name": "date_created",
"type": "Edm.DateTimeOffset"
},
{
"name": "date_modified",
"type": "Edm.DateTimeOffset"
},
{
"name": "date_processed",
"type": "Edm.DateTimeOffset"
},
{
"name": "record_locator",
"type": "Edm.String"
}
]
},
{
"name": "coordinates",
"type": "Edm.ComplexType",
"fields": [
{
"name": "system",
"type": "Edm.String"
},
{
"name": "layout_width",
"type": "Edm.Double"
},
{
"name": "layout_height",
"type": "Edm.Double"
},
{
"name": "points",
"type": "Edm.String"
}
]
},
{
"name": "page_number",
"type": "Edm.String"
},
{
"name": "url",
"type": "Edm.String"
},
{
"name": "link_urls",
"type": "Collection(Edm.String)"
},
{
"name": "link_texts",
"type": "Collection(Edm.String)"
},
{
"name": "sent_from",
"type": "Collection(Edm.String)"
},
{
"name": "sent_to",
"type": "Collection(Edm.String)"
},
{
"name": "subject",
"type": "Edm.String"
},
{
"name": "section",
"type": "Edm.String"
},
{
"name": "header_footer_type",
"type": "Edm.String"
},
{
"name": "emphasized_text_contents",
"type": "Collection(Edm.String)"
},
{
"name": "emphasized_text_tags",
"type": "Collection(Edm.String)"
},
{
"name": "text_as_html",
"type": "Edm.String"
},
{
"name": "regex_metadata",
"type": "Edm.String"
},
{
"name": "detection_class_prob",
"type": "Edm.Double"
}
]
}
]
}

View File

@ -0,0 +1,81 @@
Azure Cognitive Search
==========
Batch process all your records using ``unstructured-ingest`` to store structured outputs locally on your filesystem and upload those local files to an Azure Cognitive Search index.
First you'll need to install the azure cognitive search dependencies as shown here.
.. code:: shell
pip install "unstructured[azure-cognitive-search]"
Run Locally
-----------
The upstream connector can be any of the ones supported, but for convenience here, showing a sample command using the
upstream s3 connector.
.. tabs::
.. tab:: Shell
.. code:: shell
unstructured-ingest \
s3 \
--remote-url s3://utic-dev-tech-fixtures/small-pdf-set/ \
--anonymous \
--output-dir s3-small-batch-output-to-azure \
--num-processes 2 \
--verbose \
--strategy fast \
azure-cognitive-search \
--key "$AZURE_SEARCH_API_KEY" \
--endpoint "$AZURE_SEARCH_ENDPOINT" \
--index utic-test-ingest-fixtures-output
.. tab:: Python
.. code:: python
import os
import subprocess
command = [
"unstructured-ingest",
"s3",
"--remote-url", "s3://utic-dev-tech-fixtures/small-pdf-set/",
"--anonymous",
"--output-dir", "s3-small-batch-output-to-azure",
"--num-processes", "2",
"--verbose",
"--strategy", "fast",
"azure-cognitive-search",
"--key", os.getenv("AZURE_SEARCH_API_KEY"),
"--endpoint", os.getenv("$AZURE_SEARCH_ENDPOINT"),
"--index", "utic-test-ingest-fixtures-output",
]
# Run the command
process = subprocess.Popen(command, stdout=subprocess.PIPE)
output, error = process.communicate()
# Print output
if process.returncode == 0:
print("Command executed successfully. Output:")
print(output.decode())
else:
print("Command failed. Error:")
print(error.decode())
For a full list of the options the CLI accepts check ``unstructured-ingest <upstream connector> azure-cognitive-search --help``.
NOTE: Keep in mind that you will need to have all the appropriate extras and dependencies for the file types of the documents contained in your data storage platform if you're running this locally. You can find more information about this in the `installation guide <https://unstructured-io.github.io/unstructured/installing.html>`_.
Sample Index Schema
-----------
To make sure the schema of the index matches the data being written to it, a sample schema json can be used:
.. literalinclude:: azure_cognitive_sample_index_schema.json
:language: json
:linenos:
:caption: Object description

View File

@ -1,11 +0,0 @@
Downstream Connectors
===================
Connect to your favorite data storage platforms for effortless batch processing of your files.
We are constantly adding new data connectors and if you don't see your favorite platform let us know
in our community `Slack. <https://join.slack.com/t/unstructuredw-kbe4326/shared_invite/zt-20kd2e9ti-q5yz7RCa2nlyqmAba9vqRw>`_
.. toctree::
:maxdepth: 1
downstream_connectors/delta_table

View File

@ -0,0 +1,22 @@
#!/usr/bin/env bash
# Processes all the files from abfs://container1/ in azureunstructured1 account,
# using the `unstructured` library.
# Structured outputs are stored in azure-ingest-output/
SCRIPT_DIR=$( cd -- "$( dirname -- "${BASH_SOURCE[0]}" )" &> /dev/null && pwd )
cd "$SCRIPT_DIR"/../../.. || exit 1
PYTHONPATH=. ./unstructured/ingest/main.py \
s3 \
--remote-url s3://utic-dev-tech-fixtures/small-pdf-set/ \
--anonymous \
--output-dir s3-small-batch-output-to-azure \
--num-processes 2 \
--verbose \
--strategy fast \
azure-cognitive-search \
--key "$AZURE_SEARCH_API_KEY" \
--endpoint "$AZURE_SEARCH_ENDPOINT" \
--index utic-test-ingest-fixtures-output

View File

@ -37,4 +37,4 @@ weaviate-client==3.23.2
# Note(yuming) - pining to avoid conflict with paddle install
matplotlib==3.7.2
# NOTE(crag) - pin to available pandas for python 3.8 (at least in CI)
pandas<2.1.1
pandas<2.0.4

View File

@ -69,7 +69,9 @@ defusedxml==0.7.1
distlib==0.3.7
# via virtualenv
exceptiongroup==1.1.3
# via anyio
# via
# -c requirements/test.txt
# anyio
executing==1.2.0
# via stack-data
fastjsonschema==2.18.0
@ -344,6 +346,7 @@ tinycss2==1.2.1
# via nbconvert
tomli==2.0.1
# via
# -c requirements/test.txt
# build
# jupyterlab
# pip-tools

View File

@ -0,0 +1,3 @@
-c constraints.in
-c base.txt
azure-search-documents

View File

@ -0,0 +1,56 @@
#
# This file is autogenerated by pip-compile with Python 3.8
# by the following command:
#
# pip-compile requirements/ingest-azure-cognitive-search.in
#
azure-common==1.1.28
# via azure-search-documents
azure-core==1.29.4
# via
# azure-search-documents
# msrest
azure-search-documents==11.3.0
# via -r requirements/ingest-azure-cognitive-search.in
certifi==2023.7.22
# via
# -c requirements/base.txt
# -c requirements/constraints.in
# msrest
# requests
charset-normalizer==3.2.0
# via
# -c requirements/base.txt
# requests
idna==3.4
# via
# -c requirements/base.txt
# requests
isodate==0.6.1
# via msrest
msrest==0.7.1
# via azure-search-documents
oauthlib==3.2.2
# via requests-oauthlib
requests==2.31.0
# via
# -c requirements/base.txt
# azure-core
# msrest
# requests-oauthlib
requests-oauthlib==1.3.1
# via msrest
six==1.16.0
# via
# azure-core
# isodate
typing-extensions==4.8.0
# via
# -c requirements/base.txt
# azure-core
# azure-search-documents
urllib3==1.26.16
# via
# -c requirements/base.txt
# -c requirements/constraints.in
# requests

View File

@ -11,7 +11,7 @@ certifi==2023.7.22
# elastic-transport
elastic-transport==8.4.0
# via elasticsearch
elasticsearch==8.9.0
elasticsearch==8.10.0
# via -r requirements/ingest-elasticsearch.in
jq==1.6.0
# via -r requirements/ingest-elasticsearch.in

18
scripts/pip-compile.sh Executable file
View File

@ -0,0 +1,18 @@
#!/usr/bin/env bash
# python version must match lowest supported (3.8)
major=3
minor=8
if ! python -c "import sys; assert sys.version_info.major == $major and sys.version_info.minor == $minor"; then
echo "python version not equal to expected $major.$minor: $(python --version)"
exit 1
fi
for file in requirements/*.in; do
if [[ "$file" =~ "constraints" ]]; then
continue;
fi;
echo "running: pip-compile --upgrade $file"
pip-compile --upgrade "$file"
done
cp requirements/build.txt docs/requirements.txt

View File

@ -129,6 +129,9 @@ setup(
# Extra requirements for data connectors
"s3": load_requirements("requirements/ingest-s3.in"),
"azure": load_requirements("requirements/ingest-azure.in"),
"azure-cognitive-search": load_requirements(
"requirements/ingest-azure-cognitive-search.in",
),
"biomed": load_requirements("requirements/ingest-biomed.in"),
"discord": load_requirements("requirements/ingest-discord.in"),
"github": load_requirements("requirements/ingest-github.in"),

View File

@ -0,0 +1,165 @@
{
"@odata.context": "https://utic-test-ingest-fixtures.search.windows.net/$metadata#indexes/$entity",
"@odata.etag": "\"0x8DBB93E09C8F4BD\"",
"fields": [
{
"name": "id",
"type": "Edm.String",
"key": true
},
{
"name": "element_id",
"type": "Edm.String"
},
{
"name": "text",
"type": "Edm.String"
},
{
"name": "type",
"type": "Edm.String"
},
{
"name": "metadata",
"type": "Edm.ComplexType",
"fields": [
{
"name": "category_depth",
"type": "Edm.Int32"
},
{
"name": "parent_id",
"type": "Edm.String"
},
{
"name": "attached_to_filename",
"type": "Edm.String"
},
{
"name": "filetype",
"type": "Edm.String"
},
{
"name": "last_modified",
"type": "Edm.DateTimeOffset"
},
{
"name": "file_directory",
"type": "Edm.String"
},
{
"name": "filename",
"type": "Edm.String"
},
{
"name": "data_source",
"type": "Edm.ComplexType",
"fields": [
{
"name": "url",
"type": "Edm.String"
},
{
"name": "version",
"type": "Edm.String"
},
{
"name": "date_created",
"type": "Edm.DateTimeOffset"
},
{
"name": "date_modified",
"type": "Edm.DateTimeOffset"
},
{
"name": "date_processed",
"type": "Edm.DateTimeOffset"
},
{
"name": "record_locator",
"type": "Edm.String"
}
]
},
{
"name": "coordinates",
"type": "Edm.ComplexType",
"fields": [
{
"name": "system",
"type": "Edm.String"
},
{
"name": "layout_width",
"type": "Edm.Double"
},
{
"name": "layout_height",
"type": "Edm.Double"
},
{
"name": "points",
"type": "Edm.String"
}
]
},
{
"name": "page_number",
"type": "Edm.String"
},
{
"name": "url",
"type": "Edm.String"
},
{
"name": "link_urls",
"type": "Collection(Edm.String)"
},
{
"name": "link_texts",
"type": "Collection(Edm.String)"
},
{
"name": "sent_from",
"type": "Collection(Edm.String)"
},
{
"name": "sent_to",
"type": "Collection(Edm.String)"
},
{
"name": "subject",
"type": "Edm.String"
},
{
"name": "section",
"type": "Edm.String"
},
{
"name": "header_footer_type",
"type": "Edm.String"
},
{
"name": "emphasized_text_contents",
"type": "Collection(Edm.String)"
},
{
"name": "emphasized_text_tags",
"type": "Collection(Edm.String)"
},
{
"name": "text_as_html",
"type": "Edm.String"
},
{
"name": "regex_metadata",
"type": "Edm.String"
},
{
"name": "detection_class_prob",
"type": "Edm.Double"
}
]
}
]
}

View File

@ -0,0 +1,85 @@
#!/usr/bin/env bash
set -e
SCRIPT_DIR=$(dirname "$(realpath "$0")")
cd "$SCRIPT_DIR"/.. || exit 1
OUTPUT_FOLDER_NAME=s3
OUTPUT_DIR=$SCRIPT_DIR/structured-output/$OUTPUT_FOLDER_NAME
DOWNLOAD_DIR=$SCRIPT_DIR/download/$OUTPUT_FOLDER_NAME
DESTINATION_INDEX="utic-test-ingest-fixtures-output-$(date +%s)"
API_VERSION=2020-06-30
if [ -z "$AZURE_SEARCH_ENDPOINT" ] && [ -z "$AZURE_SEARCH_API_KEY" ]; then
echo "Skipping Azure Cognitive Search ingest test because neither AZURE_SEARCH_ENDPOINT nor AZURE_SEARCH_API_KEY env vars are set."
exit 0
fi
function cleanup {
response_code=$(curl -s -o /dev/null -w "%{http_code}" \
"https://utic-test-ingest-fixtures.search.windows.net/indexes/$DESTINATION_INDEX?api-version=$API_VERSION" \
--header "api-key: JV1LDVRivKEY9J9rHBQqQeTvaGoYbD670RWRaANxaTAzSeDy8Eon" \
--header 'content-type: application/json')
if [ "$response_code" == "200" ]; then
echo "deleting index $DESTINATION_INDEX"
curl -X DELETE \
"https://utic-test-ingest-fixtures.search.windows.net/indexes/$DESTINATION_INDEX?api-version=$API_VERSION" \
--header "api-key: JV1LDVRivKEY9J9rHBQqQeTvaGoYbD670RWRaANxaTAzSeDy8Eon" \
--header 'content-type: application/json'
else
echo "Index $DESTINATION_INDEX does not exist, nothing to delete"
fi
}
trap cleanup EXIT
# Create index
echo "Creating index $DESTINATION_INDEX"
response_code=$(curl -s -o /dev/null -w "%{http_code}" -X PUT \
"https://utic-test-ingest-fixtures.search.windows.net/indexes/$DESTINATION_INDEX?api-version=$API_VERSION" \
--header "api-key: JV1LDVRivKEY9J9rHBQqQeTvaGoYbD670RWRaANxaTAzSeDy8Eon" \
--header 'content-type: application/json' \
--data "@$SCRIPT_DIR/files/azure_cognitive_index_schema.json")
if [ "$response_code" -lt 400 ]; then
echo "Index creation success: $response_code"
else
echo "Index creation failure: $response_code"
exit 1
fi
PYTHONPATH=. ./unstructured/ingest/main.py \
s3 \
--download-dir "$DOWNLOAD_DIR" \
--metadata-exclude coordinates,filename,file_directory,metadata.data_source.date_processed,metadata.last_modified,metadata.detection_class_prob,metadata.parent_id,metadata.category_depth \
--strategy fast \
--preserve-downloads \
--reprocess \
--output-dir "$OUTPUT_DIR" \
--verbose \
--remote-url s3://utic-dev-tech-fixtures/small-pdf-set/ \
--anonymous \
azure-cognitive-search \
--key "$AZURE_SEARCH_API_KEY" \
--endpoint "$AZURE_SEARCH_ENDPOINT" \
--index "$DESTINATION_INDEX"
echo "sleeping 5 seconds to let index finish catching up after writes"
sleep 5
# Check the contents of the index
docs_count=$(curl "https://utic-test-ingest-fixtures.search.windows.net/indexes/$DESTINATION_INDEX/docs/\$count?api-version=$API_VERSION" \
--header "api-key: $AZURE_SEARCH_API_KEY" \
--header 'content-type: application/json' | jq)
expected_docs_count=0
for i in $(jq length "$OUTPUT_DIR"/*); do
expected_docs_count=$((expected_docs_count+i));
done
if [ "$docs_count" -ne "$expected_docs_count" ];then
echo "Number of docs $docs_count doesn't match the expected docs: $expected_docs_count"
exit 1
fi

View File

@ -4,6 +4,7 @@ import click
from .airtable import get_source_cmd as airtable_src
from .azure import get_source_cmd as azure_src
from .azure_cognitive_search import get_dest_cmd as azure_cognitive_search_dest
from .biomed import get_source_cmd as biomed_src
from .box import get_source_cmd as box_src
from .confluence import get_source_cmd as confluence_src
@ -58,7 +59,7 @@ src: t.List[click.Group] = [
wikipedia_src(),
]
dest: t.List[click.Command] = [s3_dest(), delta_table_dest()]
dest: t.List[click.Command] = [azure_cognitive_search_dest(), s3_dest(), delta_table_dest()]
__all__ = [
"src",

View File

@ -0,0 +1,92 @@
import logging
from dataclasses import dataclass
import click
from unstructured.ingest.cli.cmds.utils import conform_click_options
from unstructured.ingest.cli.common import (
log_options,
)
from unstructured.ingest.cli.interfaces import (
CliMixin,
CliPartitionConfig,
CliReadConfig,
)
from unstructured.ingest.interfaces import BaseConfig
from unstructured.ingest.logger import ingest_log_streaming_init, logger
from unstructured.ingest.runner import runner_map
@dataclass
class AzureCognitiveSearchCliWriteConfig(BaseConfig, CliMixin):
key: str
endpoint: str
index: str
@staticmethod
def add_cli_options(cmd: click.Command) -> None:
options = [
click.Option(
["--key"],
required=True,
type=str,
help="Key credential used for authenticating to an Azure service.",
envvar="AZURE_SEARCH_API_KEY",
show_envvar=True,
),
click.Option(
["--endpoint"],
required=True,
type=str,
help="The URL endpoint of an Azure search service. "
"In the form of https://{{service_name}}.search.windows.net",
envvar="AZURE_SEARCH_ENDPOINT",
show_envvar=True,
),
click.Option(
["--index"],
required=True,
type=str,
help="The name of the index to connect to",
),
]
cmd.params.extend(options)
@click.command(name="azure-cognitive-search")
@click.pass_context
def azure_cognitive_search_dest(ctx: click.Context, **options):
if not ctx.parent:
raise click.ClickException("destination command called without a parent")
if not ctx.parent.info_name:
raise click.ClickException("parent command missing info name")
source_cmd = ctx.parent.info_name.replace("-", "_")
runner_fn = runner_map[source_cmd]
parent_options: dict = ctx.parent.params if ctx.parent else {}
conform_click_options(options)
conform_click_options(parent_options)
verbose = parent_options.get("verbose", False)
ingest_log_streaming_init(logging.DEBUG if verbose else logging.INFO)
log_options(parent_options, verbose=verbose)
log_options(options, verbose=verbose)
try:
read_config = CliReadConfig.from_dict(parent_options)
partition_config = CliPartitionConfig.from_dict(parent_options)
# Run for schema validation
AzureCognitiveSearchCliWriteConfig.from_dict(options)
runner_fn(
read_config=read_config,
partition_config=partition_config,
writer_type="azure_cognitive_search",
writer_kwargs=options,
**parent_options,
)
except Exception as e:
logger.error(e, exc_info=True)
raise click.ClickException(str(e)) from e
def get_dest_cmd() -> click.Command:
cmd = azure_cognitive_search_dest
AzureCognitiveSearchCliWriteConfig.add_cli_options(cmd)
return cmd

View File

@ -16,6 +16,7 @@ from unstructured.ingest.cli.interfaces import (
from unstructured.ingest.interfaces import BaseConfig
from unstructured.ingest.logger import ingest_log_streaming_init, logger
from unstructured.ingest.runner import delta_table as delta_table_fn
from unstructured.ingest.runner import runner_map
@dataclass
@ -108,14 +109,15 @@ class DeltaTableCliWriteConfig(BaseConfig, CliMixin):
@click.command(name="delta-table")
@click.pass_context
def delta_table_dest(ctx: click.Context, **options):
if not ctx.parent:
raise click.ClickException("destination command called without a parent")
if not ctx.parent.info_name:
raise click.ClickException("parent command missing info name")
source_cmd = ctx.parent.info_name.replace("-", "_")
runner_fn = runner_map[source_cmd]
parent_options: dict = ctx.parent.params if ctx.parent else {}
# Click sets all multiple fields as tuple, this needs to be updated to list
for k, v in options.items():
if isinstance(v, tuple):
options[k] = list(v)
for k, v in parent_options.items():
if isinstance(v, tuple):
parent_options[k] = list(v)
conform_click_options(options)
conform_click_options(parent_options)
verbose = parent_options.get("verbose", False)
ingest_log_streaming_init(logging.DEBUG if verbose else logging.INFO)
log_options(parent_options, verbose=verbose)
@ -127,7 +129,7 @@ def delta_table_dest(ctx: click.Context, **options):
# Run for schema validation
DeltaTableCliConfig.from_dict(options)
DeltaTableCliWriteConfig.from_dict(options)
delta_table_fn(
runner_fn(
read_config=read_config,
partition_config=partition_config,
writer_type="delta_table",

View File

@ -0,0 +1,123 @@
import json
import typing as t
import uuid
from dataclasses import dataclass
import azure.core.exceptions
from unstructured.ingest.error import WriteError
from unstructured.ingest.interfaces import (
BaseConnectorConfig,
BaseDestinationConnector,
BaseIngestDoc,
WriteConfig,
)
from unstructured.ingest.logger import logger
from unstructured.utils import requires_dependencies
@dataclass
class SimpleAzureCognitiveSearchStorageConfig(BaseConnectorConfig):
endpoint: str
key: str
@dataclass
class AzureCognitiveSearchWriteConfig(WriteConfig):
index: str
@dataclass
class AzureCognitiveSearchDestinationConnector(BaseDestinationConnector):
write_config: AzureCognitiveSearchWriteConfig
connector_config: SimpleAzureCognitiveSearchStorageConfig
@requires_dependencies(["azure"], extras="azure-cognitive-search")
def initialize(self):
from azure.core.credentials import AzureKeyCredential
from azure.search.documents import SearchClient
# Create a client
credential = AzureKeyCredential(self.connector_config.key)
self.client = SearchClient(
endpoint=self.connector_config.endpoint,
index_name=self.write_config.index,
credential=credential,
)
def conform_dict(self, data: dict) -> None:
"""
updates the dictionary that is from each Element being converted into a dict/json
into a dictionary that conforms to the schema expected by the
Azure Cognitive Search index
"""
from dateutil import parser # type: ignore
data["id"] = str(uuid.uuid4())
if points := data.get("metadata", {}).get("coordinates", {}).get("points"):
data["metadata"]["coordinates"]["points"] = json.dumps(points)
if version := data.get("metadata", {}).get("data_source", {}).get("version"):
data["metadata"]["data_source"]["version"] = str(version)
if record_locator := data.get("metadata", {}).get("data_source", {}).get("record_locator"):
data["metadata"]["data_source"]["record_locator"] = json.dumps(record_locator)
if last_modified := data.get("metadata", {}).get("last_modified"):
data["metadata"]["last_modified"] = parser.parse(last_modified).strftime(
"%Y-%m-%dT%H:%M:%S.%fZ",
)
if date_created := data.get("metadata", {}).get("data_source", {}).get("date_created"):
data["metadata"]["data_source"]["date_created"] = parser.parse(date_created).strftime(
"%Y-%m-%dT%H:%M:%S.%fZ",
)
if date_modified := data.get("metadata", {}).get("data_source", {}).get("date_modified"):
data["metadata"]["data_source"]["date_modified"] = parser.parse(date_modified).strftime(
"%Y-%m-%dT%H:%M:%S.%fZ",
)
if date_processed := data.get("metadata", {}).get("data_source", {}).get("date_processed"):
data["metadata"]["data_source"]["date_processed"] = parser.parse(
date_processed,
).strftime("%Y-%m-%dT%H:%M:%S.%fZ")
if regex_metadata := data.get("metadata", {}).get("regex_metadata"):
data["metadata"]["regex_metadata"] = json.dumps(regex_metadata)
if page_number := data.get("metadata", {}).get("page_number"):
data["metadata"]["page_number"] = str(page_number)
def write(self, docs: t.List[BaseIngestDoc]) -> None:
json_list = []
for doc in docs:
local_path = doc._output_filename
with open(local_path) as json_file:
# TODO element id not a sufficient unique id to use
json_content = json.load(json_file)
for content in json_content:
self.conform_dict(data=content)
logger.info(
f"appending {len(json_content)} json elements from content in {local_path}",
)
json_list.extend(json_content)
logger.info(
f"writing {len(json_list)} documents to destination "
f"index at {self.write_config.index}",
)
try:
results = self.client.upload_documents(documents=json_list)
except azure.core.exceptions.HttpResponseError as http_error:
raise WriteError(f"http error: {http_error}") from http_error
errors = []
success = []
for result in results:
if result.succeeded:
success.append(result)
else:
errors.append(result)
logger.debug(f"results: {len(success)} successes, {len(errors)} failures")
if errors:
raise WriteError(
", ".join(
[
f"{error.key}: [{error.status_code}] {error.error_message}"
for error in errors
],
),
)

View File

@ -37,5 +37,9 @@ class EmbeddingEncoderConnectionError(CustomError):
error_string = "Error in connecting to the embedding model provider: {}"
class WriteError(CustomError):
error_string = "Error in writing to downstream data source: {}"
class PartitionError(CustomError):
error_string = "Error in partitioning content: {}"

View File

@ -1,3 +1,5 @@
import typing as t
from .airtable import airtable
from .azure import azure
from .biomed import biomed
@ -24,6 +26,35 @@ from .sharepoint import sharepoint
from .slack import slack
from .wikipedia import wikipedia
runner_map: t.Dict[str, t.Callable] = {
"airtable": airtable,
"azure": azure,
"biomed": biomed,
"box": box,
"confluence": confluence,
"delta_table": delta_table,
"discord": discord,
"dropbox": dropbox,
"elasticsearch": elasticsearch,
"fsspec": fsspec,
"gcs": gcs,
"github": github,
"gitlab": gitlab,
"gdrive": gdrive,
"google_drive": gdrive,
"jira": jira,
"local": local,
"notion": notion,
"onedrive": onedrive,
"outlook": outlook,
"reddit": reddit,
"s3": s3,
"salesforce": salesforce,
"sharepoint": sharepoint,
"slack": slack,
"wikipedia": wikipedia,
}
__all__ = [
"airtable",
"azure",
@ -50,4 +81,5 @@ __all__ = [
"sharepoint",
"slack",
"wikipedia",
"runner_map",
]

View File

@ -25,6 +25,29 @@ def s3_writer(
)
@requires_dependencies(["azure"], extras="azure-cognitive-search")
def azure_cognitive_search_writer(
endpoint: str,
key: str,
index: str,
):
from unstructured.ingest.connector.azure_cognitive_search import (
AzureCognitiveSearchDestinationConnector,
AzureCognitiveSearchWriteConfig,
SimpleAzureCognitiveSearchStorageConfig,
)
return AzureCognitiveSearchDestinationConnector(
write_config=AzureCognitiveSearchWriteConfig(
index=index,
),
connector_config=SimpleAzureCognitiveSearchStorageConfig(
endpoint=endpoint,
key=key,
),
)
@requires_dependencies(["deltalake"], extras="delta-table")
def delta_table_writer(
table_uri: t.Union[str, Path],
@ -51,4 +74,5 @@ def delta_table_writer(
writer_map: t.Dict[str, t.Callable] = {
"s3": s3_writer,
"delta_table": delta_table_writer,
"azure_cognitive_search": azure_cognitive_search_writer,
}