From bd49cfbab70af8b341b84335829cc342a5c4f439 Mon Sep 17 00:00:00 2001 From: Roman Isecke <136338424+rbiseck3@users.noreply.github.com> Date: Mon, 25 Sep 2023 10:27:42 -0400 Subject: [PATCH] feat: adds Azure Cognitive Search (full text) destination connector (#1459) ### Description New [Azure Cognitive Search](https://azure.microsoft.com/en-us/products/ai-services/cognitive-search) destination connector added. Writes each json element from the created json files via partition and writes that content to an index. **Bonus bug fix:** Due to a recent change where the default version of python used in the repo was bumped to `3.10` from `3.8`, this means running `pip-compile` now runs it against that version rather than the lowest we support which is still `3.8`. This breaks the setup for those lower versions because some of the versions pulled in by `pip-compile` exist for `3.10` but not `3.8`. `pip-compile` was updates to run as a script that checks the version of python being used first, which helps guarantee that all dependencies meet the minimum python version requirement. Closes out https://github.com/Unstructured-IO/unstructured/issues/1466 --- .github/workflows/ci.yml | 2 + .../ingest-test-fixtures-update-pr.yml | 2 + CHANGELOG.md | 1 + Makefile | 9 +- docs/source/destination_connectors.rst | 1 + .../azure_cognitive_sample_index_schema.json | 166 ++++++++++++++++++ .../azure_cognitive_search.rst | 81 +++++++++ docs/source/downstream_connectors.rst | 11 -- .../ingest/azure_cognitive_search/ingest.sh | 22 +++ requirements/constraints.in | 2 +- requirements/dev.txt | 5 +- requirements/ingest-azure-cognitive-search.in | 3 + .../ingest-azure-cognitive-search.txt | 56 ++++++ requirements/ingest-elasticsearch.txt | 2 +- scripts/pip-compile.sh | 18 ++ setup.py | 3 + .../files/azure_cognitive_index_schema.json | 165 +++++++++++++++++ .../test-ingest-azure-cognitive-search.sh | 85 +++++++++ unstructured/ingest/cli/cmds/__init__.py | 3 +- .../ingest/cli/cmds/azure_cognitive_search.py | 92 ++++++++++ unstructured/ingest/cli/cmds/delta_table.py | 18 +- .../connector/azure_cognitive_search.py | 123 +++++++++++++ unstructured/ingest/error.py | 4 + unstructured/ingest/runner/__init__.py | 32 ++++ unstructured/ingest/runner/writers.py | 24 +++ 25 files changed, 899 insertions(+), 31 deletions(-) create mode 100644 docs/source/destination_connectors/azure_cognitive_sample_index_schema.json create mode 100644 docs/source/destination_connectors/azure_cognitive_search.rst delete mode 100644 docs/source/downstream_connectors.rst create mode 100755 examples/ingest/azure_cognitive_search/ingest.sh create mode 100644 requirements/ingest-azure-cognitive-search.in create mode 100644 requirements/ingest-azure-cognitive-search.txt create mode 100755 scripts/pip-compile.sh create mode 100644 test_unstructured_ingest/files/azure_cognitive_index_schema.json create mode 100755 test_unstructured_ingest/test-ingest-azure-cognitive-search.sh create mode 100644 unstructured/ingest/cli/cmds/azure_cognitive_search.py create mode 100644 unstructured/ingest/connector/azure_cognitive_search.py diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index 71dd034ae..ced9617dc 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -289,6 +289,8 @@ jobs: NOTION_API_KEY: ${{ secrets.NOTION_API_KEY }} AWS_SECRET_ACCESS_KEY: ${{ secrets.AWS_SECRET_ACCESS_KEY }} AWS_ACCESS_KEY_ID: ${{ secrets.AWS_ACCESS_KEY_ID }} + AZURE_SEARCH_ENDPOINT: ${{ secrets.AZURE_SEARCH_ENDPOINT }} + AZURE_SEARCH_API_KEY: ${{ secrets.AZURE_SEARCH_API_KEY }} TABLE_OCR: "tesseract" ENTIRE_PAGE_OCR: "tesseract" run: | diff --git a/.github/workflows/ingest-test-fixtures-update-pr.yml b/.github/workflows/ingest-test-fixtures-update-pr.yml index c78de9cd0..7ca7d242f 100644 --- a/.github/workflows/ingest-test-fixtures-update-pr.yml +++ b/.github/workflows/ingest-test-fixtures-update-pr.yml @@ -86,6 +86,8 @@ jobs: NOTION_API_KEY: ${{ secrets.NOTION_API_KEY }} AWS_SECRET_ACCESS_KEY: ${{ secrets.AWS_SECRET_ACCESS_KEY }} AWS_ACCESS_KEY_ID: ${{ secrets.AWS_ACCESS_KEY_ID }} + AZURE_SEARCH_ENDPOINT: ${{ secrets.AZURE_SEARCH_ENDPOINT }} + AZURE_SEARCH_API_KEY: ${{ secrets.AZURE_SEARCH_API_KEY }} TABLE_OCR: "tesseract" ENTIRE_PAGE_OCR: "tesseract" OVERWRITE_FIXTURES: "true" diff --git a/CHANGELOG.md b/CHANGELOG.md index c1c72179a..26414ce65 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -4,6 +4,7 @@ * **Adds data source properties to SharePoint, Outlook, Onedrive, Reddit, and Slack connectors** These properties (date_created, date_modified, version, source_url, record_locator) are written to element metadata during ingest, mapping elements to information about the document source from which they derive. This functionality enables downstream applications to reveal source document applications, e.g. a link to a GDrive doc, Salesforce record, etc. * **Add functionality to save embedded images in PDF's separately as images** This allows users to save embedded images in PDF's separately as images, given some directory path. The saved image path is written to the metadata for the Image element. Downstream applications may benefit by providing users with image links from relevant "hits." +* **Azure Cognite Search destination connector** New Azure Cognitive Search destination connector added to ingest CLI. Users may now use `unstructured-ingest` to write partitioned data from over 20 data sources (so far) to an Azure Cognitive Search index. ### Features diff --git a/Makefile b/Makefile index 51ede9205..061b7a9b0 100644 --- a/Makefile +++ b/Makefile @@ -221,14 +221,7 @@ install-paddleocr: ## pip-compile: compiles all base/dev/test requirements .PHONY: pip-compile pip-compile: - @for file in $(shell ls requirements/*.in); do \ - if [[ "$${file}" =~ "constraints" ]]; then \ - continue; \ - fi; \ - echo "running: pip-compile --upgrade $${file}"; \ - pip-compile --upgrade $${file}; \ - done - cp requirements/build.txt docs/requirements.txt + @scripts/pip-compile.sh diff --git a/docs/source/destination_connectors.rst b/docs/source/destination_connectors.rst index d741bd236..fa17ca5a1 100644 --- a/docs/source/destination_connectors.rst +++ b/docs/source/destination_connectors.rst @@ -9,3 +9,4 @@ in our community `Slack. azure-cognitive-search --help``. + +NOTE: Keep in mind that you will need to have all the appropriate extras and dependencies for the file types of the documents contained in your data storage platform if you're running this locally. You can find more information about this in the `installation guide `_. + +Sample Index Schema +----------- +To make sure the schema of the index matches the data being written to it, a sample schema json can be used: + +.. literalinclude:: azure_cognitive_sample_index_schema.json + :language: json + :linenos: + :caption: Object description diff --git a/docs/source/downstream_connectors.rst b/docs/source/downstream_connectors.rst deleted file mode 100644 index 62d7335ad..000000000 --- a/docs/source/downstream_connectors.rst +++ /dev/null @@ -1,11 +0,0 @@ -Downstream Connectors -=================== - -Connect to your favorite data storage platforms for effortless batch processing of your files. -We are constantly adding new data connectors and if you don't see your favorite platform let us know -in our community `Slack. `_ - -.. toctree:: - :maxdepth: 1 - - downstream_connectors/delta_table diff --git a/examples/ingest/azure_cognitive_search/ingest.sh b/examples/ingest/azure_cognitive_search/ingest.sh new file mode 100755 index 000000000..b8d4186ca --- /dev/null +++ b/examples/ingest/azure_cognitive_search/ingest.sh @@ -0,0 +1,22 @@ +#!/usr/bin/env bash + +# Processes all the files from abfs://container1/ in azureunstructured1 account, +# using the `unstructured` library. + +# Structured outputs are stored in azure-ingest-output/ + +SCRIPT_DIR=$( cd -- "$( dirname -- "${BASH_SOURCE[0]}" )" &> /dev/null && pwd ) +cd "$SCRIPT_DIR"/../../.. || exit 1 + +PYTHONPATH=. ./unstructured/ingest/main.py \ + s3 \ + --remote-url s3://utic-dev-tech-fixtures/small-pdf-set/ \ + --anonymous \ + --output-dir s3-small-batch-output-to-azure \ + --num-processes 2 \ + --verbose \ + --strategy fast \ + azure-cognitive-search \ + --key "$AZURE_SEARCH_API_KEY" \ + --endpoint "$AZURE_SEARCH_ENDPOINT" \ + --index utic-test-ingest-fixtures-output diff --git a/requirements/constraints.in b/requirements/constraints.in index eedfebd32..2cd8ee993 100644 --- a/requirements/constraints.in +++ b/requirements/constraints.in @@ -37,4 +37,4 @@ weaviate-client==3.23.2 # Note(yuming) - pining to avoid conflict with paddle install matplotlib==3.7.2 # NOTE(crag) - pin to available pandas for python 3.8 (at least in CI) -pandas<2.1.1 +pandas<2.0.4 diff --git a/requirements/dev.txt b/requirements/dev.txt index 98f6d6883..fb7d65fb4 100644 --- a/requirements/dev.txt +++ b/requirements/dev.txt @@ -69,7 +69,9 @@ defusedxml==0.7.1 distlib==0.3.7 # via virtualenv exceptiongroup==1.1.3 - # via anyio + # via + # -c requirements/test.txt + # anyio executing==1.2.0 # via stack-data fastjsonschema==2.18.0 @@ -344,6 +346,7 @@ tinycss2==1.2.1 # via nbconvert tomli==2.0.1 # via + # -c requirements/test.txt # build # jupyterlab # pip-tools diff --git a/requirements/ingest-azure-cognitive-search.in b/requirements/ingest-azure-cognitive-search.in new file mode 100644 index 000000000..25514d3f6 --- /dev/null +++ b/requirements/ingest-azure-cognitive-search.in @@ -0,0 +1,3 @@ +-c constraints.in +-c base.txt +azure-search-documents diff --git a/requirements/ingest-azure-cognitive-search.txt b/requirements/ingest-azure-cognitive-search.txt new file mode 100644 index 000000000..8794ebfa9 --- /dev/null +++ b/requirements/ingest-azure-cognitive-search.txt @@ -0,0 +1,56 @@ +# +# This file is autogenerated by pip-compile with Python 3.8 +# by the following command: +# +# pip-compile requirements/ingest-azure-cognitive-search.in +# +azure-common==1.1.28 + # via azure-search-documents +azure-core==1.29.4 + # via + # azure-search-documents + # msrest +azure-search-documents==11.3.0 + # via -r requirements/ingest-azure-cognitive-search.in +certifi==2023.7.22 + # via + # -c requirements/base.txt + # -c requirements/constraints.in + # msrest + # requests +charset-normalizer==3.2.0 + # via + # -c requirements/base.txt + # requests +idna==3.4 + # via + # -c requirements/base.txt + # requests +isodate==0.6.1 + # via msrest +msrest==0.7.1 + # via azure-search-documents +oauthlib==3.2.2 + # via requests-oauthlib +requests==2.31.0 + # via + # -c requirements/base.txt + # azure-core + # msrest + # requests-oauthlib +requests-oauthlib==1.3.1 + # via msrest +six==1.16.0 + # via + # azure-core + # isodate +typing-extensions==4.8.0 + # via + # -c requirements/base.txt + # azure-core + # azure-search-documents +urllib3==1.26.16 + # via + # -c requirements/base.txt + # -c requirements/constraints.in + # requests diff --git a/requirements/ingest-elasticsearch.txt b/requirements/ingest-elasticsearch.txt index 93ed77150..c7a6cb3c0 100644 --- a/requirements/ingest-elasticsearch.txt +++ b/requirements/ingest-elasticsearch.txt @@ -11,7 +11,7 @@ certifi==2023.7.22 # elastic-transport elastic-transport==8.4.0 # via elasticsearch -elasticsearch==8.9.0 +elasticsearch==8.10.0 # via -r requirements/ingest-elasticsearch.in jq==1.6.0 # via -r requirements/ingest-elasticsearch.in diff --git a/scripts/pip-compile.sh b/scripts/pip-compile.sh new file mode 100755 index 000000000..0bc23e11d --- /dev/null +++ b/scripts/pip-compile.sh @@ -0,0 +1,18 @@ +#!/usr/bin/env bash + +# python version must match lowest supported (3.8) +major=3 +minor=8 +if ! python -c "import sys; assert sys.version_info.major == $major and sys.version_info.minor == $minor"; then + echo "python version not equal to expected $major.$minor: $(python --version)" + exit 1 +fi + +for file in requirements/*.in; do + if [[ "$file" =~ "constraints" ]]; then + continue; + fi; + echo "running: pip-compile --upgrade $file" + pip-compile --upgrade "$file" +done +cp requirements/build.txt docs/requirements.txt diff --git a/setup.py b/setup.py index 407d94fdc..4b08e3c07 100644 --- a/setup.py +++ b/setup.py @@ -129,6 +129,9 @@ setup( # Extra requirements for data connectors "s3": load_requirements("requirements/ingest-s3.in"), "azure": load_requirements("requirements/ingest-azure.in"), + "azure-cognitive-search": load_requirements( + "requirements/ingest-azure-cognitive-search.in", + ), "biomed": load_requirements("requirements/ingest-biomed.in"), "discord": load_requirements("requirements/ingest-discord.in"), "github": load_requirements("requirements/ingest-github.in"), diff --git a/test_unstructured_ingest/files/azure_cognitive_index_schema.json b/test_unstructured_ingest/files/azure_cognitive_index_schema.json new file mode 100644 index 000000000..c15cbcc73 --- /dev/null +++ b/test_unstructured_ingest/files/azure_cognitive_index_schema.json @@ -0,0 +1,165 @@ +{ + "@odata.context": "https://utic-test-ingest-fixtures.search.windows.net/$metadata#indexes/$entity", + "@odata.etag": "\"0x8DBB93E09C8F4BD\"", + "fields": [ + { + "name": "id", + "type": "Edm.String", + "key": true + }, + { + "name": "element_id", + "type": "Edm.String" + }, + { + "name": "text", + "type": "Edm.String" + }, + { + "name": "type", + "type": "Edm.String" + }, + { + "name": "metadata", + "type": "Edm.ComplexType", + "fields": [ + { + "name": "category_depth", + "type": "Edm.Int32" + }, + { + "name": "parent_id", + "type": "Edm.String" + }, + { + "name": "attached_to_filename", + "type": "Edm.String" + }, + { + "name": "filetype", + "type": "Edm.String" + }, + { + "name": "last_modified", + "type": "Edm.DateTimeOffset" + }, + { + "name": "file_directory", + "type": "Edm.String" + }, + { + "name": "filename", + "type": "Edm.String" + }, + { + "name": "data_source", + "type": "Edm.ComplexType", + "fields": [ + { + "name": "url", + "type": "Edm.String" + }, + { + "name": "version", + "type": "Edm.String" + }, + { + "name": "date_created", + "type": "Edm.DateTimeOffset" + }, + { + "name": "date_modified", + "type": "Edm.DateTimeOffset" + }, + { + "name": "date_processed", + "type": "Edm.DateTimeOffset" + }, + { + "name": "record_locator", + "type": "Edm.String" + } + ] + }, + { + "name": "coordinates", + "type": "Edm.ComplexType", + "fields": [ + { + "name": "system", + "type": "Edm.String" + }, + { + "name": "layout_width", + "type": "Edm.Double" + }, + { + "name": "layout_height", + "type": "Edm.Double" + }, + { + "name": "points", + "type": "Edm.String" + } + ] + }, + { + "name": "page_number", + "type": "Edm.String" + }, + { + "name": "url", + "type": "Edm.String" + }, + { + "name": "link_urls", + "type": "Collection(Edm.String)" + }, + { + "name": "link_texts", + "type": "Collection(Edm.String)" + }, + { + "name": "sent_from", + "type": "Collection(Edm.String)" + }, + { + "name": "sent_to", + "type": "Collection(Edm.String)" + }, + { + "name": "subject", + "type": "Edm.String" + }, + { + "name": "section", + "type": "Edm.String" + }, + { + "name": "header_footer_type", + "type": "Edm.String" + }, + { + "name": "emphasized_text_contents", + "type": "Collection(Edm.String)" + }, + { + "name": "emphasized_text_tags", + "type": "Collection(Edm.String)" + }, + { + "name": "text_as_html", + "type": "Edm.String" + }, + { + "name": "regex_metadata", + "type": "Edm.String" + }, + { + "name": "detection_class_prob", + "type": "Edm.Double" + } + ] + } + ] +} diff --git a/test_unstructured_ingest/test-ingest-azure-cognitive-search.sh b/test_unstructured_ingest/test-ingest-azure-cognitive-search.sh new file mode 100755 index 000000000..c40bccc56 --- /dev/null +++ b/test_unstructured_ingest/test-ingest-azure-cognitive-search.sh @@ -0,0 +1,85 @@ +#!/usr/bin/env bash + +set -e + +SCRIPT_DIR=$(dirname "$(realpath "$0")") +cd "$SCRIPT_DIR"/.. || exit 1 +OUTPUT_FOLDER_NAME=s3 +OUTPUT_DIR=$SCRIPT_DIR/structured-output/$OUTPUT_FOLDER_NAME +DOWNLOAD_DIR=$SCRIPT_DIR/download/$OUTPUT_FOLDER_NAME +DESTINATION_INDEX="utic-test-ingest-fixtures-output-$(date +%s)" +API_VERSION=2020-06-30 + +if [ -z "$AZURE_SEARCH_ENDPOINT" ] && [ -z "$AZURE_SEARCH_API_KEY" ]; then + echo "Skipping Azure Cognitive Search ingest test because neither AZURE_SEARCH_ENDPOINT nor AZURE_SEARCH_API_KEY env vars are set." + exit 0 +fi + +function cleanup { + response_code=$(curl -s -o /dev/null -w "%{http_code}" \ + "https://utic-test-ingest-fixtures.search.windows.net/indexes/$DESTINATION_INDEX?api-version=$API_VERSION" \ + --header "api-key: JV1LDVRivKEY9J9rHBQqQeTvaGoYbD670RWRaANxaTAzSeDy8Eon" \ + --header 'content-type: application/json') + if [ "$response_code" == "200" ]; then + echo "deleting index $DESTINATION_INDEX" + curl -X DELETE \ + "https://utic-test-ingest-fixtures.search.windows.net/indexes/$DESTINATION_INDEX?api-version=$API_VERSION" \ + --header "api-key: JV1LDVRivKEY9J9rHBQqQeTvaGoYbD670RWRaANxaTAzSeDy8Eon" \ + --header 'content-type: application/json' + else + echo "Index $DESTINATION_INDEX does not exist, nothing to delete" + fi +} + +trap cleanup EXIT + + +# Create index +echo "Creating index $DESTINATION_INDEX" +response_code=$(curl -s -o /dev/null -w "%{http_code}" -X PUT \ +"https://utic-test-ingest-fixtures.search.windows.net/indexes/$DESTINATION_INDEX?api-version=$API_VERSION" \ +--header "api-key: JV1LDVRivKEY9J9rHBQqQeTvaGoYbD670RWRaANxaTAzSeDy8Eon" \ +--header 'content-type: application/json' \ +--data "@$SCRIPT_DIR/files/azure_cognitive_index_schema.json") + +if [ "$response_code" -lt 400 ]; then + echo "Index creation success: $response_code" +else + echo "Index creation failure: $response_code" + exit 1 +fi + +PYTHONPATH=. ./unstructured/ingest/main.py \ + s3 \ + --download-dir "$DOWNLOAD_DIR" \ + --metadata-exclude coordinates,filename,file_directory,metadata.data_source.date_processed,metadata.last_modified,metadata.detection_class_prob,metadata.parent_id,metadata.category_depth \ + --strategy fast \ + --preserve-downloads \ + --reprocess \ + --output-dir "$OUTPUT_DIR" \ + --verbose \ + --remote-url s3://utic-dev-tech-fixtures/small-pdf-set/ \ + --anonymous \ + azure-cognitive-search \ + --key "$AZURE_SEARCH_API_KEY" \ + --endpoint "$AZURE_SEARCH_ENDPOINT" \ + --index "$DESTINATION_INDEX" + +echo "sleeping 5 seconds to let index finish catching up after writes" +sleep 5 + +# Check the contents of the index +docs_count=$(curl "https://utic-test-ingest-fixtures.search.windows.net/indexes/$DESTINATION_INDEX/docs/\$count?api-version=$API_VERSION" \ + --header "api-key: $AZURE_SEARCH_API_KEY" \ + --header 'content-type: application/json' | jq) + +expected_docs_count=0 +for i in $(jq length "$OUTPUT_DIR"/*); do + expected_docs_count=$((expected_docs_count+i)); +done + + +if [ "$docs_count" -ne "$expected_docs_count" ];then + echo "Number of docs $docs_count doesn't match the expected docs: $expected_docs_count" + exit 1 +fi diff --git a/unstructured/ingest/cli/cmds/__init__.py b/unstructured/ingest/cli/cmds/__init__.py index f72dd80ac..9c5123ab5 100644 --- a/unstructured/ingest/cli/cmds/__init__.py +++ b/unstructured/ingest/cli/cmds/__init__.py @@ -4,6 +4,7 @@ import click from .airtable import get_source_cmd as airtable_src from .azure import get_source_cmd as azure_src +from .azure_cognitive_search import get_dest_cmd as azure_cognitive_search_dest from .biomed import get_source_cmd as biomed_src from .box import get_source_cmd as box_src from .confluence import get_source_cmd as confluence_src @@ -58,7 +59,7 @@ src: t.List[click.Group] = [ wikipedia_src(), ] -dest: t.List[click.Command] = [s3_dest(), delta_table_dest()] +dest: t.List[click.Command] = [azure_cognitive_search_dest(), s3_dest(), delta_table_dest()] __all__ = [ "src", diff --git a/unstructured/ingest/cli/cmds/azure_cognitive_search.py b/unstructured/ingest/cli/cmds/azure_cognitive_search.py new file mode 100644 index 000000000..a426a68c7 --- /dev/null +++ b/unstructured/ingest/cli/cmds/azure_cognitive_search.py @@ -0,0 +1,92 @@ +import logging +from dataclasses import dataclass + +import click + +from unstructured.ingest.cli.cmds.utils import conform_click_options +from unstructured.ingest.cli.common import ( + log_options, +) +from unstructured.ingest.cli.interfaces import ( + CliMixin, + CliPartitionConfig, + CliReadConfig, +) +from unstructured.ingest.interfaces import BaseConfig +from unstructured.ingest.logger import ingest_log_streaming_init, logger +from unstructured.ingest.runner import runner_map + + +@dataclass +class AzureCognitiveSearchCliWriteConfig(BaseConfig, CliMixin): + key: str + endpoint: str + index: str + + @staticmethod + def add_cli_options(cmd: click.Command) -> None: + options = [ + click.Option( + ["--key"], + required=True, + type=str, + help="Key credential used for authenticating to an Azure service.", + envvar="AZURE_SEARCH_API_KEY", + show_envvar=True, + ), + click.Option( + ["--endpoint"], + required=True, + type=str, + help="The URL endpoint of an Azure search service. " + "In the form of https://{{service_name}}.search.windows.net", + envvar="AZURE_SEARCH_ENDPOINT", + show_envvar=True, + ), + click.Option( + ["--index"], + required=True, + type=str, + help="The name of the index to connect to", + ), + ] + cmd.params.extend(options) + + +@click.command(name="azure-cognitive-search") +@click.pass_context +def azure_cognitive_search_dest(ctx: click.Context, **options): + if not ctx.parent: + raise click.ClickException("destination command called without a parent") + if not ctx.parent.info_name: + raise click.ClickException("parent command missing info name") + source_cmd = ctx.parent.info_name.replace("-", "_") + runner_fn = runner_map[source_cmd] + parent_options: dict = ctx.parent.params if ctx.parent else {} + conform_click_options(options) + conform_click_options(parent_options) + verbose = parent_options.get("verbose", False) + ingest_log_streaming_init(logging.DEBUG if verbose else logging.INFO) + log_options(parent_options, verbose=verbose) + log_options(options, verbose=verbose) + try: + read_config = CliReadConfig.from_dict(parent_options) + partition_config = CliPartitionConfig.from_dict(parent_options) + # Run for schema validation + AzureCognitiveSearchCliWriteConfig.from_dict(options) + runner_fn( + read_config=read_config, + partition_config=partition_config, + writer_type="azure_cognitive_search", + writer_kwargs=options, + **parent_options, + ) + except Exception as e: + logger.error(e, exc_info=True) + raise click.ClickException(str(e)) from e + + +def get_dest_cmd() -> click.Command: + cmd = azure_cognitive_search_dest + AzureCognitiveSearchCliWriteConfig.add_cli_options(cmd) + return cmd diff --git a/unstructured/ingest/cli/cmds/delta_table.py b/unstructured/ingest/cli/cmds/delta_table.py index e31efe308..b3af8a84d 100644 --- a/unstructured/ingest/cli/cmds/delta_table.py +++ b/unstructured/ingest/cli/cmds/delta_table.py @@ -16,6 +16,7 @@ from unstructured.ingest.cli.interfaces import ( from unstructured.ingest.interfaces import BaseConfig from unstructured.ingest.logger import ingest_log_streaming_init, logger from unstructured.ingest.runner import delta_table as delta_table_fn +from unstructured.ingest.runner import runner_map @dataclass @@ -108,14 +109,15 @@ class DeltaTableCliWriteConfig(BaseConfig, CliMixin): @click.command(name="delta-table") @click.pass_context def delta_table_dest(ctx: click.Context, **options): + if not ctx.parent: + raise click.ClickException("destination command called without a parent") + if not ctx.parent.info_name: + raise click.ClickException("parent command missing info name") + source_cmd = ctx.parent.info_name.replace("-", "_") + runner_fn = runner_map[source_cmd] parent_options: dict = ctx.parent.params if ctx.parent else {} - # Click sets all multiple fields as tuple, this needs to be updated to list - for k, v in options.items(): - if isinstance(v, tuple): - options[k] = list(v) - for k, v in parent_options.items(): - if isinstance(v, tuple): - parent_options[k] = list(v) + conform_click_options(options) + conform_click_options(parent_options) verbose = parent_options.get("verbose", False) ingest_log_streaming_init(logging.DEBUG if verbose else logging.INFO) log_options(parent_options, verbose=verbose) @@ -127,7 +129,7 @@ def delta_table_dest(ctx: click.Context, **options): # Run for schema validation DeltaTableCliConfig.from_dict(options) DeltaTableCliWriteConfig.from_dict(options) - delta_table_fn( + runner_fn( read_config=read_config, partition_config=partition_config, writer_type="delta_table", diff --git a/unstructured/ingest/connector/azure_cognitive_search.py b/unstructured/ingest/connector/azure_cognitive_search.py new file mode 100644 index 000000000..7b3e7f071 --- /dev/null +++ b/unstructured/ingest/connector/azure_cognitive_search.py @@ -0,0 +1,123 @@ +import json +import typing as t +import uuid +from dataclasses import dataclass + +import azure.core.exceptions + +from unstructured.ingest.error import WriteError +from unstructured.ingest.interfaces import ( + BaseConnectorConfig, + BaseDestinationConnector, + BaseIngestDoc, + WriteConfig, +) +from unstructured.ingest.logger import logger +from unstructured.utils import requires_dependencies + + +@dataclass +class SimpleAzureCognitiveSearchStorageConfig(BaseConnectorConfig): + endpoint: str + key: str + + +@dataclass +class AzureCognitiveSearchWriteConfig(WriteConfig): + index: str + + +@dataclass +class AzureCognitiveSearchDestinationConnector(BaseDestinationConnector): + write_config: AzureCognitiveSearchWriteConfig + connector_config: SimpleAzureCognitiveSearchStorageConfig + + @requires_dependencies(["azure"], extras="azure-cognitive-search") + def initialize(self): + from azure.core.credentials import AzureKeyCredential + from azure.search.documents import SearchClient + + # Create a client + credential = AzureKeyCredential(self.connector_config.key) + self.client = SearchClient( + endpoint=self.connector_config.endpoint, + index_name=self.write_config.index, + credential=credential, + ) + + def conform_dict(self, data: dict) -> None: + """ + updates the dictionary that is from each Element being converted into a dict/json + into a dictionary that conforms to the schema expected by the + Azure Cognitive Search index + """ + from dateutil import parser # type: ignore + + data["id"] = str(uuid.uuid4()) + + if points := data.get("metadata", {}).get("coordinates", {}).get("points"): + data["metadata"]["coordinates"]["points"] = json.dumps(points) + if version := data.get("metadata", {}).get("data_source", {}).get("version"): + data["metadata"]["data_source"]["version"] = str(version) + if record_locator := data.get("metadata", {}).get("data_source", {}).get("record_locator"): + data["metadata"]["data_source"]["record_locator"] = json.dumps(record_locator) + if last_modified := data.get("metadata", {}).get("last_modified"): + data["metadata"]["last_modified"] = parser.parse(last_modified).strftime( + "%Y-%m-%dT%H:%M:%S.%fZ", + ) + if date_created := data.get("metadata", {}).get("data_source", {}).get("date_created"): + data["metadata"]["data_source"]["date_created"] = parser.parse(date_created).strftime( + "%Y-%m-%dT%H:%M:%S.%fZ", + ) + if date_modified := data.get("metadata", {}).get("data_source", {}).get("date_modified"): + data["metadata"]["data_source"]["date_modified"] = parser.parse(date_modified).strftime( + "%Y-%m-%dT%H:%M:%S.%fZ", + ) + if date_processed := data.get("metadata", {}).get("data_source", {}).get("date_processed"): + data["metadata"]["data_source"]["date_processed"] = parser.parse( + date_processed, + ).strftime("%Y-%m-%dT%H:%M:%S.%fZ") + if regex_metadata := data.get("metadata", {}).get("regex_metadata"): + data["metadata"]["regex_metadata"] = json.dumps(regex_metadata) + if page_number := data.get("metadata", {}).get("page_number"): + data["metadata"]["page_number"] = str(page_number) + + def write(self, docs: t.List[BaseIngestDoc]) -> None: + json_list = [] + for doc in docs: + local_path = doc._output_filename + with open(local_path) as json_file: + # TODO element id not a sufficient unique id to use + json_content = json.load(json_file) + for content in json_content: + self.conform_dict(data=content) + logger.info( + f"appending {len(json_content)} json elements from content in {local_path}", + ) + json_list.extend(json_content) + logger.info( + f"writing {len(json_list)} documents to destination " + f"index at {self.write_config.index}", + ) + try: + results = self.client.upload_documents(documents=json_list) + + except azure.core.exceptions.HttpResponseError as http_error: + raise WriteError(f"http error: {http_error}") from http_error + errors = [] + success = [] + for result in results: + if result.succeeded: + success.append(result) + else: + errors.append(result) + logger.debug(f"results: {len(success)} successes, {len(errors)} failures") + if errors: + raise WriteError( + ", ".join( + [ + f"{error.key}: [{error.status_code}] {error.error_message}" + for error in errors + ], + ), + ) diff --git a/unstructured/ingest/error.py b/unstructured/ingest/error.py index f1228edd2..9a3eebacf 100644 --- a/unstructured/ingest/error.py +++ b/unstructured/ingest/error.py @@ -37,5 +37,9 @@ class EmbeddingEncoderConnectionError(CustomError): error_string = "Error in connecting to the embedding model provider: {}" +class WriteError(CustomError): + error_string = "Error in writing to downstream data source: {}" + + class PartitionError(CustomError): error_string = "Error in partitioning content: {}" diff --git a/unstructured/ingest/runner/__init__.py b/unstructured/ingest/runner/__init__.py index a742d9b41..4dbe2316a 100644 --- a/unstructured/ingest/runner/__init__.py +++ b/unstructured/ingest/runner/__init__.py @@ -1,3 +1,5 @@ +import typing as t + from .airtable import airtable from .azure import azure from .biomed import biomed @@ -24,6 +26,35 @@ from .sharepoint import sharepoint from .slack import slack from .wikipedia import wikipedia +runner_map: t.Dict[str, t.Callable] = { + "airtable": airtable, + "azure": azure, + "biomed": biomed, + "box": box, + "confluence": confluence, + "delta_table": delta_table, + "discord": discord, + "dropbox": dropbox, + "elasticsearch": elasticsearch, + "fsspec": fsspec, + "gcs": gcs, + "github": github, + "gitlab": gitlab, + "gdrive": gdrive, + "google_drive": gdrive, + "jira": jira, + "local": local, + "notion": notion, + "onedrive": onedrive, + "outlook": outlook, + "reddit": reddit, + "s3": s3, + "salesforce": salesforce, + "sharepoint": sharepoint, + "slack": slack, + "wikipedia": wikipedia, +} + __all__ = [ "airtable", "azure", @@ -50,4 +81,5 @@ __all__ = [ "sharepoint", "slack", "wikipedia", + "runner_map", ] diff --git a/unstructured/ingest/runner/writers.py b/unstructured/ingest/runner/writers.py index e1daaf250..3201315bd 100644 --- a/unstructured/ingest/runner/writers.py +++ b/unstructured/ingest/runner/writers.py @@ -25,6 +25,29 @@ def s3_writer( ) +@requires_dependencies(["azure"], extras="azure-cognitive-search") +def azure_cognitive_search_writer( + endpoint: str, + key: str, + index: str, +): + from unstructured.ingest.connector.azure_cognitive_search import ( + AzureCognitiveSearchDestinationConnector, + AzureCognitiveSearchWriteConfig, + SimpleAzureCognitiveSearchStorageConfig, + ) + + return AzureCognitiveSearchDestinationConnector( + write_config=AzureCognitiveSearchWriteConfig( + index=index, + ), + connector_config=SimpleAzureCognitiveSearchStorageConfig( + endpoint=endpoint, + key=key, + ), + ) + + @requires_dependencies(["deltalake"], extras="delta-table") def delta_table_writer( table_uri: t.Union[str, Path], @@ -51,4 +74,5 @@ def delta_table_writer( writer_map: t.Dict[str, t.Callable] = { "s3": s3_writer, "delta_table": delta_table_writer, + "azure_cognitive_search": azure_cognitive_search_writer, }