roman/update ingest pipeline docs (#1689)

### Description
* Update all existing connector docs to use new pipeline approach

### Additional changes:
* Some defaults were set for the runners to match those in the configs
to make those easy to handle, i.e. the biomed runner:
```python
max_retries: int = 5,
max_request_time: int = 45,
decay: float = 0.3,
```
This commit is contained in:
Roman Isecke 2023-10-17 12:11:16 -04:00 committed by GitHub
parent 9ea3734fd0
commit adacd8e5b1
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
33 changed files with 574 additions and 333 deletions

View File

@ -1,3 +1,11 @@
## 0.10.25-dev0
### Enhancements
### Features
### Fixes
## 0.10.24
### Enhancements

View File

@ -31,18 +31,21 @@ Run Locally
import os
from unstructured.ingest.interfaces import PartitionConfig, ReadConfig
from unstructured.ingest.runner.airtable import airtable
from unstructured.ingest.interfaces import PartitionConfig, ReadConfig, ProcessorConfig
from unstructured.ingest.runner import AirtableRunner
if __name__ == "__main__":
airtable(
verbose=True,
read_config=ReadConfig(),
partition_config=PartitionConfig(
runner = AirtableRunner(
processor_config=ProcessorConfig(
verbose=True,
output_dir="airtable-ingest-output",
num_processes=2,
),
personal_access_token=os.getenv("AIRTABLE_PERSONAL_ACCESS_TOKEN"),
read_config=ReadConfig(),
partition_config=PartitionConfig()
)
runner.run(
personal_access_token=os.getenv("AIRTABLE_PERSONAL_ACCESS_TOKEN")
)
Run via the API
@ -72,19 +75,23 @@ You can also use upstream connectors with the ``unstructured`` API. For this you
import os
from unstructured.ingest.interfaces import PartitionConfig, ReadConfig
from unstructured.ingest.runner.airtable import airtable
from unstructured.ingest.interfaces import PartitionConfig, ProcessorConfig, ReadConfig
from unstructured.ingest.runner import AirtableRunner
if __name__ == "__main__":
airtable(
verbose=True,
read_config=ReadConfig(),
partition_config=PartitionConfig(
runner = AirtableRunner(
processor_config=ProcessorConfig(
verbose=True,
output_dir="airtable-ingest-output",
num_processes=2,
),
read_config=ReadConfig(),
partition_config=PartitionConfig(
partition_by_api=True,
api_key=os.getenv("UNSTRUCTURED_API_KEY"),
),
)
runner.run(
personal_access_token=os.getenv("AIRTABLE_PERSONAL_ACCESS_TOKEN"),
)

View File

@ -28,17 +28,22 @@ Run Locally
.. code:: python
from unstructured.ingest.interfaces import PartitionConfig, ReadConfig
from unstructured.ingest.runner.azure import azure
import os
from unstructured.ingest.interfaces import PartitionConfig, ProcessorConfig, ReadConfig
from unstructured.ingest.runner import AzureRunner
if __name__ == "__main__":
azure(
verbose=True,
read_config=ReadConfig(),
partition_config=PartitionConfig(
runner = AzureRunner(
processor_config=ProcessorConfig(
verbose=True,
output_dir="azure-ingest-output",
num_processes=2,
),
read_config=ReadConfig(),
partition_config=PartitionConfig(),
)
runner.run(
remote_url="abfs://container1/",
account_name="azureunstructured1",
)
@ -56,19 +61,23 @@ You can also use upstream connectors with the ``unstructured`` API. For this you
import os
from unstructured.ingest.interfaces import PartitionConfig, ReadConfig
from unstructured.ingest.runner.azure import azure
from unstructured.ingest.interfaces import PartitionConfig, ProcessorConfig, ReadConfig
from unstructured.ingest.runner import AzureRunner
if __name__ == "__main__":
azure(
verbose=True,
read_config=ReadConfig(),
partition_config=PartitionConfig(
runner = AzureRunner(
processor_config=ProcessorConfig(
verbose=True,
output_dir="azure-ingest-output",
num_processes=2,
),
read_config=ReadConfig(),
partition_config=PartitionConfig(
partition_by_api=True,
api_key=os.getenv("UNSTRUCTURED_API_KEY"),
),
)
runner.run(
remote_url="abfs://container1/",
account_name="azureunstructured1",
)

View File

@ -29,19 +29,22 @@ Run Locally
.. code:: python
from unstructured.ingest.interfaces import PartitionConfig, ReadConfig
from unstructured.ingest.runner.biomed import biomed
import os
from unstructured.ingest.interfaces import PartitionConfig, ProcessorConfig, ReadConfig
from unstructured.ingest.runner import BiomedRunner
if __name__ == "__main__":
biomed(
verbose=True,
read_config=ReadConfig(
preserve_downloads=True,
),
partition_config=PartitionConfig(
output_dir="biomed-ingest-output-path",
runner = BiomedRunner(
processor_config=ProcessorConfig(
verbose=True,
output_dir="bioemt-ingest-output",
num_processes=2,
),
read_config=ReadConfig(),
partition_config=PartitionConfig(),
)
runner.run(
path="oa_pdf/07/07/sbaa031.073.PMC7234218.pdf",
)
@ -72,21 +75,23 @@ You can also use upstream connectors with the ``unstructured`` API. For this you
import os
from unstructured.ingest.interfaces import PartitionConfig, ReadConfig
from unstructured.ingest.runner.biomed import biomed
from unstructured.ingest.interfaces import PartitionConfig, ProcessorConfig, ReadConfig
from unstructured.ingest.runner import BiomedRunner
if __name__ == "__main__":
biomed(
verbose=True,
read_config=ReadConfig(
preserve_downloads=True,
),
partition_config=PartitionConfig(
output_dir="biomed-ingest-output-path",
runner = BiomedRunner(
processor_config=ProcessorConfig(
verbose=True,
output_dir="bioemt-ingest-output",
num_processes=2,
),
read_config=ReadConfig(),
partition_config=PartitionConfig(
partition_by_api=True,
api_key=os.getenv("UNSTRUCTURED_API_KEY"),
),
)
runner.run(
path="oa_pdf/07/07/sbaa031.073.PMC7234218.pdf",
)

View File

@ -32,17 +32,20 @@ Run Locally
import os
from unstructured.ingest.interfaces import PartitionConfig, ReadConfig
from unstructured.ingest.runner.box import box
from unstructured.ingest.interfaces import PartitionConfig, ProcessorConfig, ReadConfig
from unstructured.ingest.runner import BoxRunner
if __name__ == "__main__":
box(
verbose=True,
read_config=ReadConfig(),
partition_config=PartitionConfig(
runner = BoxRunner(
processor_config=ProcessorConfig(
verbose=True,
output_dir="box-output",
num_processes=2,
),
read_config=ReadConfig(),
partition_config=PartitionConfig(),
)
runner.run(
box_app_config=os.getenv("BOX_APP_CONFIG_PATH"),
recursive=True,
remote_url="box://utic-test-ingest-fixtures",
@ -76,19 +79,23 @@ You can also use upstream connectors with the ``unstructured`` API. For this you
import os
from unstructured.ingest.interfaces import PartitionConfig, ReadConfig
from unstructured.ingest.runner.box import box
from unstructured.ingest.interfaces import PartitionConfig, ProcessorConfig, ReadConfig
from unstructured.ingest.runner import BoxRunner
if __name__ == "__main__":
box(
verbose=True,
read_config=ReadConfig(),
partition_config=PartitionConfig(
runner = BoxRunner(
processor_config=ProcessorConfig(
verbose=True,
output_dir="box-output",
num_processes=2,
),
read_config=ReadConfig(),
partition_config=PartitionConfig(
partition_by_api=True,
api_key=os.getenv("UNSTRUCTURED_API_KEY"),
),
)
runner.run(
box_app_config=os.getenv("BOX_APP_CONFIG_PATH"),
recursive=True,
remote_url="box://utic-test-ingest-fixtures",

View File

@ -30,18 +30,24 @@ Run Locally
.. code:: python
from unstructured.ingest.interfaces import PartitionConfig, ReadConfig
from unstructured.ingest.runner.confluence import confluence
import os
from unstructured.ingest.interfaces import PartitionConfig, ProcessorConfig, ReadConfig
from unstructured.ingest.runner import ConfluenceRunner
if __name__ == "__main__":
confluence(
verbose=True,
read_config=ReadConfig(),
partition_config=PartitionConfig(
runner = ConfluenceRunner(
processor_config=ProcessorConfig(
verbose=True,
output_dir="confluence-ingest-output",
num_processes=2,
),
read_config=ReadConfig(),
partition_config=PartitionConfig(
metadata_exclude=["filename", "file_directory", "metadata.data_source.date_processed"],
),
)
runner.run(
url="https://unstructured-ingest-test.atlassian.net",
user_email="12345678@unstructured.io",
api_token="ABCDE1234ABDE1234ABCDE1234",
@ -75,20 +81,24 @@ You can also use upstream connectors with the ``unstructured`` API. For this you
import os
from unstructured.ingest.interfaces import PartitionConfig, ReadConfig
from unstructured.ingest.runner.confluence import confluence
from unstructured.ingest.interfaces import PartitionConfig, ProcessorConfig, ReadConfig
from unstructured.ingest.runner import ConfluenceRunner
if __name__ == "__main__":
confluence(
verbose=True,
read_config=ReadConfig(),
partition_config=PartitionConfig(
runner = ConfluenceRunner(
processor_config=ProcessorConfig(
verbose=True,
output_dir="confluence-ingest-output",
num_processes=2,
),
read_config=ReadConfig(),
partition_config=PartitionConfig(
metadata_exclude=["filename", "file_directory", "metadata.data_source.date_processed"],
partition_by_api=True,
api_key=os.getenv("UNSTRUCTURED_API_KEY"),
),
)
runner.run(
url="https://unstructured-ingest-test.atlassian.net",
user_email="12345678@unstructured.io",
api_token="ABCDE1234ABDE1234ABCDE1234",

View File

@ -29,19 +29,24 @@ Run Locally
.. code:: python
from unstructured.ingest.interfaces import PartitionConfig, ReadConfig
from unstructured.ingest.runner.delta_table import delta_table
import os
from unstructured.ingest.interfaces import PartitionConfig, ProcessorConfig, ReadConfig
from unstructured.ingest.runner import DeltaTableRunner
if __name__ == "__main__":
delta_table(
verbose=True,
read_config=ReadConfig(),
partition_config=PartitionConfig(
runner = DeltaTableRunner(
processor_config=ProcessorConfig(
verbose=True,
output_dir="delta-table-example",
num_processes=2,
),
read_config=ReadConfig(),
partition_config=PartitionConfig(),
)
runner.run(
table_uri="s3://utic-dev-tech-fixtures/sample-delta-lake-data/deltatable/",
storage_options="AWS_REGION=us-east-2,AWS_ACCESS_KEY_ID=$AWS_ACCESS_KEY_ID,AWS_SECRET_ACCESS_KEY=$AWS_SECRET_ACCESS_KEY"
storage_options="AWS_REGION=us-east-2,AWS_ACCESS_KEY_ID=$AWS_ACCESS_KEY_ID,AWS_SECRET_ACCESS_KEY=$AWS_SECRET_ACCESS_KEY",
)
@ -71,21 +76,25 @@ You can also use upstream connectors with the ``unstructured`` API. For this you
import os
from unstructured.ingest.interfaces import PartitionConfig, ReadConfig
from unstructured.ingest.runner.delta_table import delta_table
from unstructured.ingest.interfaces import PartitionConfig, ProcessorConfig, ReadConfig
from unstructured.ingest.runner import DeltaTableRunner
if __name__ == "__main__":
delta_table(
verbose=True,
read_config=ReadConfig(),
partition_config=PartitionConfig(
runner = DeltaTableRunner(
processor_config=ProcessorConfig(
verbose=True,
output_dir="delta-table-example",
num_processes=2,
),
read_config=ReadConfig(),
partition_config=PartitionConfig(
partition_by_api=True,
api_key=os.getenv("UNSTRUCTURED_API_KEY"),
),
)
runner.run(
table_uri="s3://utic-dev-tech-fixtures/sample-delta-lake-data/deltatable/",
storage_options="AWS_REGION=us-east-2,AWS_ACCESS_KEY_ID=$AWS_ACCESS_KEY_ID,AWS_SECRET_ACCESS_KEY=$AWS_SECRET_ACCESS_KEY"
storage_options="AWS_REGION=us-east-2,AWS_ACCESS_KEY_ID=$AWS_ACCESS_KEY_ID,AWS_SECRET_ACCESS_KEY=$AWS_SECRET_ACCESS_KEY",
)
Additionally, you will need to pass the ``--partition-endpoint`` if you're running the API locally. You can find more information about the ``unstructured`` API `here <https://github.com/Unstructured-IO/unstructured-api>`_.

View File

@ -32,23 +32,22 @@ Run Locally
import os
from unstructured.ingest.interfaces import PartitionConfig, ReadConfig
from unstructured.ingest.runner.discord import discord
from unstructured.ingest.interfaces import PartitionConfig, ProcessorConfig, ReadConfig
from unstructured.ingest.runner import DiscordRunner
if __name__ == "__main__":
discord(
verbose=True,
read_config=ReadConfig(
download_dir="discord-ingest-download",
preserve_downloads=True,
),
partition_config=PartitionConfig(
output_dir="discord-example",
runner = DiscordRunner(
processor_config=ProcessorConfig(
verbose=True,
output_dir="discord-ingest-example",
num_processes=2,
),
read_config=ReadConfig(),
partition_config=PartitionConfig(),
)
runner.run(
channels=["12345678"],
token=os.getenv("DISCORD_TOKEN"),
period=None,
)
Run via the API
@ -79,25 +78,25 @@ You can also use upstream connectors with the ``unstructured`` API. For this you
import os
from unstructured.ingest.interfaces import PartitionConfig, ReadConfig
from unstructured.ingest.runner.discord import discord
from unstructured.ingest.interfaces import PartitionConfig, ProcessorConfig, ReadConfig
from unstructured.ingest.runner import DiscordRunner
if __name__ == "__main__":
discord(
verbose=True,
read_config=ReadConfig(
download_dir="discord-ingest-download",
preserve_downloads=True,
),
partition_config=PartitionConfig(
output_dir="discord-example",
runner = DiscordRunner(
processor_config=ProcessorConfig(
verbose=True,
output_dir="discord-ingest-example",
num_processes=2,
),
read_config=ReadConfig(),
partition_config=PartitionConfig(
partition_by_api=True,
api_key=os.getenv("UNSTRUCTURED_API_KEY"),
),
)
runner.run(
channels=["12345678"],
token=os.getenv("DISCORD_TOKEN"),
period=None,
)
Additionally, you will need to pass the ``--partition-endpoint`` if you're running the API locally. You can find more information about the ``unstructured`` API `here <https://github.com/Unstructured-IO/unstructured-api>`_.

View File

@ -32,17 +32,20 @@ Run Locally
import os
from unstructured.ingest.interfaces import PartitionConfig, ReadConfig
from unstructured.ingest.runner.dropbox import dropbox
from unstructured.ingest.interfaces import PartitionConfig, ProcessorConfig, ReadConfig
from unstructured.ingest.runner import DropboxRunner
if __name__ == "__main__":
dropbox(
verbose=True,
read_config=ReadConfig(),
partition_config=PartitionConfig(
runner = DropboxRunner(
processor_config=ProcessorConfig(
verbose=True,
output_dir="dropbox-output",
num_processes=2,
),
read_config=ReadConfig(),
partition_config=PartitionConfig(),
)
runner.run(
remote_url="dropbox:// /",
token=os.getenv("DROPBOX_TOKEN"),
recursive=True,
@ -76,19 +79,23 @@ You can also use upstream connectors with the ``unstructured`` API. For this you
import os
from unstructured.ingest.interfaces import PartitionConfig, ReadConfig
from unstructured.ingest.runner.dropbox import dropbox
from unstructured.ingest.interfaces import PartitionConfig, ProcessorConfig, ReadConfig
from unstructured.ingest.runner import DropboxRunner
if __name__ == "__main__":
dropbox(
verbose=True,
read_config=ReadConfig(),
partition_config=PartitionConfig(
runner = DropboxRunner(
processor_config=ProcessorConfig(
verbose=True,
output_dir="dropbox-output",
num_processes=2,
),
read_config=ReadConfig(),
partition_config=PartitionConfig(
partition_by_api=True,
api_key=os.getenv("UNSTRUCTURED_API_KEY"),
),
)
runner.run(
remote_url="dropbox:// /",
token=os.getenv("DROPBOX_TOKEN"),
recursive=True,

View File

@ -30,18 +30,24 @@ Run Locally
.. code:: python
from unstructured.ingest.interfaces import PartitionConfig, ReadConfig
from unstructured.ingest.runner.elasticsearch import elasticsearch
import os
from unstructured.ingest.interfaces import PartitionConfig, ProcessorConfig, ReadConfig
from unstructured.ingest.runner import ElasticSearchRunner
if __name__ == "__main__":
elasticsearch(
verbose=True,
read_config=ReadConfig(),
partition_config=PartitionConfig(
runner = ElasticSearchRunner(
processor_config=ProcessorConfig(
verbose=True,
output_dir="elasticsearch-ingest-output",
num_processes=2,
),
read_config=ReadConfig(),
partition_config=PartitionConfig(
metadata_exclude=["filename", "file_directory", "metadata.data_source.date_processed"],
),
)
runner.run(
url="http://localhost:9200",
index_name="movies",
jq_query="{ethnicity, director, plot}",
@ -75,20 +81,24 @@ You can also use upstream connectors with the ``unstructured`` API. For this you
import os
from unstructured.ingest.interfaces import PartitionConfig, ReadConfig
from unstructured.ingest.runner.elasticsearch import elasticsearch
from unstructured.ingest.interfaces import PartitionConfig, ProcessorConfig, ReadConfig
from unstructured.ingest.runner import ElasticSearchRunner
if __name__ == "__main__":
elasticsearch(
verbose=True,
read_config=ReadConfig(),
partition_config=PartitionConfig(
runner = ElasticSearchRunner(
processor_config=ProcessorConfig(
verbose=True,
output_dir="elasticsearch-ingest-output",
num_processes=2,
metadata_exclude=["filename", "file_directory", "metadata.data_source.date_processed"],
),
read_config=ReadConfig(),
partition_config=PartitionConfig(
partition_by_api=True,
api_key=os.getenv("UNSTRUCTURED_API_KEY"),
metadata_exclude=["filename", "file_directory", "metadata.data_source.date_processed"],
),
)
runner.run(
url="http://localhost:9200",
index_name="movies",
jq_query="{ethnicity, director, plot}",

View File

@ -29,17 +29,22 @@ Run Locally
.. code:: python
from unstructured.ingest.interfaces import PartitionConfig, ReadConfig
from unstructured.ingest.runner.github import github
import os
from unstructured.ingest.interfaces import PartitionConfig, ProcessorConfig, ReadConfig
from unstructured.ingest.runner import GithubRunner
if __name__ == "__main__":
github(
verbose=True,
read_config=ReadConfig(),
partition_config=PartitionConfig(
runner = GithubRunner(
processor_config=ProcessorConfig(
verbose=True,
output_dir="github-ingest-output",
num_processes=2,
),
read_config=ReadConfig(),
partition_config=PartitionConfig(),
)
runner.run(
url="Unstructured-IO/unstructured",
git_branch="main",
)
@ -71,19 +76,23 @@ You can also use upstream connectors with the ``unstructured`` API. For this you
import os
from unstructured.ingest.interfaces import PartitionConfig, ReadConfig
from unstructured.ingest.runner.github import github
from unstructured.ingest.interfaces import PartitionConfig, ProcessorConfig, ReadConfig
from unstructured.ingest.runner import GithubRunner
if __name__ == "__main__":
github(
verbose=True,
read_config=ReadConfig(),
partition_config=PartitionConfig(
runner = GithubRunner(
processor_config=ProcessorConfig(
verbose=True,
output_dir="github-ingest-output",
num_processes=2,
),
read_config=ReadConfig(),
partition_config=PartitionConfig(
partition_by_api=True,
api_key=os.getenv("UNSTRUCTURED_API_KEY"),
),
)
runner.run(
url="Unstructured-IO/unstructured",
git_branch="main",
)

View File

@ -29,17 +29,22 @@ Run Locally
.. code:: python
from unstructured.ingest.interfaces import PartitionConfig, ReadConfig
from unstructured.ingest.runner.gitlab import gitlab
import os
from unstructured.ingest.interfaces import PartitionConfig, ProcessorConfig, ReadConfig
from unstructured.ingest.runner import GitlabRunner
if __name__ == "__main__":
gitlab(
verbose=True,
read_config=ReadConfig(),
partition_config=PartitionConfig(
runner = GitlabRunner(
processor_config=ProcessorConfig(
verbose=True,
output_dir="gitlab-ingest-output",
num_processes=2,
),
read_config=ReadConfig(),
partition_config=PartitionConfig(),
)
runner.run(
url="https://gitlab.com/gitlab-com/content-sites/docsy-gitlab",
git_branch="v0.0.7",
)
@ -71,19 +76,23 @@ You can also use upstream connectors with the ``unstructured`` API. For this you
import os
from unstructured.ingest.interfaces import PartitionConfig, ReadConfig
from unstructured.ingest.runner.gitlab import gitlab
from unstructured.ingest.interfaces import PartitionConfig, ProcessorConfig, ReadConfig
from unstructured.ingest.runner import GitlabRunner
if __name__ == "__main__":
gitlab(
verbose=True,
read_config=ReadConfig(),
partition_config=PartitionConfig(
runner = GitlabRunner(
processor_config=ProcessorConfig(
verbose=True,
output_dir="gitlab-ingest-output",
num_processes=2,
),
read_config=ReadConfig(),
partition_config=PartitionConfig(
partition_by_api=True,
api_key=os.getenv("UNSTRUCTURED_API_KEY"),
),
)
runner.run(
url="https://gitlab.com/gitlab-com/content-sites/docsy-gitlab",
git_branch="v0.0.7",
)

View File

@ -29,17 +29,22 @@ Run Locally
.. code:: python
from unstructured.ingest.interfaces import PartitionConfig, ReadConfig
from unstructured.ingest.runner.gcs import gcs
import os
from unstructured.ingest.interfaces import PartitionConfig, ProcessorConfig, ReadConfig
from unstructured.ingest.runner import GCSRunner
if __name__ == "__main__":
gcs(
verbose=True,
read_config=ReadConfig(),
partition_config=PartitionConfig(
runner = GCSRunner(
processor_config=ProcessorConfig(
verbose=True,
output_dir="gcs-output",
num_processes=2,
),
read_config=ReadConfig(),
partition_config=PartitionConfig(),
)
runner.run(
remote_url="gs://utic-test-ingest-fixtures-public/",
recursive=True,
)
@ -69,19 +74,23 @@ You can also use upstream connectors with the ``unstructured`` API. For this you
import os
from unstructured.ingest.interfaces import PartitionConfig, ReadConfig
from unstructured.ingest.runner.gcs import gcs
from unstructured.ingest.interfaces import PartitionConfig, ProcessorConfig, ReadConfig
from unstructured.ingest.runner import GCSRunner
if __name__ == "__main__":
gcs(
verbose=True,
read_config=ReadConfig(),
partition_config=PartitionConfig(
runner = GCSRunner(
processor_config=ProcessorConfig(
verbose=True,
output_dir="gcs-output",
num_processes=2,
),
read_config=ReadConfig(),
partition_config=PartitionConfig(
partition_by_api=True,
api_key=os.getenv("UNSTRUCTURED_API_KEY"),
),
)
runner.run(
remote_url="gs://utic-test-ingest-fixtures-public/",
recursive=True,
)

View File

@ -30,17 +30,22 @@ Run Locally
.. code:: python
from unstructured.ingest.interfaces import PartitionConfig, ReadConfig
from unstructured.ingest.runner.google_drive import gdrive
import os
from unstructured.ingest.interfaces import PartitionConfig, ProcessorConfig, ReadConfig
from unstructured.ingest.runner import GoogleDriveRunner
if __name__ == "__main__":
gdrive(
verbose=True,
read_config=ReadConfig(),
partition_config=PartitionConfig(
runner = GoogleDriveRunner(
processor_config=ProcessorConfig(
verbose=True,
output_dir="google-drive-ingest-output",
num_processes=2,
),
read_config=ReadConfig(),
partition_config=PartitionConfig(),
)
runner.run(
drive_id="POPULATE WITH FILE OR FOLDER ID",
service_account_key="POPULATE WITH DRIVE SERVICE ACCOUNT KEY",
recursive=True,
@ -74,19 +79,23 @@ You can also use upstream connectors with the ``unstructured`` API. For this you
import os
from unstructured.ingest.interfaces import PartitionConfig, ReadConfig
from unstructured.ingest.runner.google_drive import gdrive
from unstructured.ingest.interfaces import PartitionConfig, ProcessorConfig, ReadConfig
from unstructured.ingest.runner import GoogleDriveRunner
if __name__ == "__main__":
gdrive(
verbose=True,
read_config=ReadConfig(),
partition_config=PartitionConfig(
runner = GoogleDriveRunner(
processor_config=ProcessorConfig(
verbose=True,
output_dir="google-drive-ingest-output",
num_processes=2,
),
read_config=ReadConfig(),
partition_config=PartitionConfig(
partition_by_api=True,
api_key=os.getenv("UNSTRUCTURED_API_KEY"),
),
)
runner.run(
drive_id="POPULATE WITH FILE OR FOLDER ID",
service_account_key="POPULATE WITH DRIVE SERVICE ACCOUNT KEY",
recursive=True,

View File

@ -31,18 +31,24 @@ Run Locally
.. code:: python
from unstructured.ingest.interfaces import PartitionConfig, ReadConfig
from unstructured.ingest.runner.jira import jira
import os
from unstructured.ingest.interfaces import PartitionConfig, ProcessorConfig, ReadConfig
from unstructured.ingest.runner import JiraRunner
if __name__ == "__main__":
jira(
verbose=True,
read_config=ReadConfig(),
partition_config=PartitionConfig(
runner = JiraRunner(
processor_config=ProcessorConfig(
verbose=True,
output_dir="jira-ingest-output",
num_processes=2,
),
read_config=ReadConfig(),
partition_config=PartitionConfig(
metadata_exclude=["filename", "file_directory", "metadata.data_source.date_processed"],
),
)
runner.run(
url="https://unstructured-jira-connector-test.atlassian.net",
user_email="12345678@unstructured.io",
api_token="ABCDE1234ABDE1234ABCDE1234",
@ -76,20 +82,24 @@ You can also use upstream connectors with the ``unstructured`` API. For this you
import os
from unstructured.ingest.interfaces import PartitionConfig, ReadConfig
from unstructured.ingest.runner.jira import jira
from unstructured.ingest.interfaces import PartitionConfig, ProcessorConfig, ReadConfig
from unstructured.ingest.runner import JiraRunner
if __name__ == "__main__":
jira(
verbose=True,
read_config=ReadConfig(),
partition_config=PartitionConfig(
runner = JiraRunner(
processor_config=ProcessorConfig(
verbose=True,
output_dir="jira-ingest-output",
num_processes=2,
metadata_exclude=["filename", "file_directory", "metadata.data_source.date_processed"],
),
read_config=ReadConfig(),
partition_config=PartitionConfig(
partition_by_api=True,
api_key=os.getenv("UNSTRUCTURED_API_KEY"),
metadata_exclude=["filename", "file_directory", "metadata.data_source.date_processed"],
),
)
runner.run(
url="https://unstructured-jira-connector-test.atlassian.net",
user_email="12345678@unstructured.io",
api_token="ABCDE1234ABDE1234ABCDE1234",

View File

@ -23,17 +23,25 @@ Run Locally
.. code:: python
from unstructured.ingest.interfaces import PartitionConfig, ReadConfig
from unstructured.ingest.runner.local import local
import os
from unstructured.ingest.interfaces import PartitionConfig, ProcessorConfig, ReadConfig
from unstructured.ingest.runner import LocalRunner
if __name__ == "__main__":
local(
verbose=True,
read_config=ReadConfig(),
partition_config=PartitionConfig(
runner = LocalRunner(
processor_config=ProcessorConfig(
verbose=True,
output_dir="local-ingest-output",
num_processes=2,
),
read_config=ReadConfig(),
partition_config=PartitionConfig(
partition_by_api=True,
api_key=os.getenv("UNSTRUCTURED_API_KEY"),
),
)
runner.run(
input_path="example-docs",
recursive=True,
)
@ -65,19 +73,23 @@ You can also use upstream connectors with the ``unstructured`` API. For this you
import os
from unstructured.ingest.interfaces import PartitionConfig, ReadConfig
from unstructured.ingest.runner.local import local
from unstructured.ingest.interfaces import PartitionConfig, ProcessorConfig, ReadConfig
from unstructured.ingest.runner import LocalRunner
if __name__ == "__main__":
local(
verbose=True,
read_config=ReadConfig(),
partition_config=PartitionConfig(
runner = LocalRunner(
processor_config=ProcessorConfig(
verbose=True,
output_dir="local-ingest-output",
num_processes=2,
),
read_config=ReadConfig(),
partition_config=PartitionConfig(
partition_by_api=True,
api_key=os.getenv("UNSTRUCTURED_API_KEY"),
),
)
runner.run(
input_path="example-docs",
recursive=True,
)

View File

@ -30,17 +30,22 @@ Run Locally
.. code:: python
from unstructured.ingest.interfaces import PartitionConfig, ReadConfig
from unstructured.ingest.runner.notion import notion
import os
from unstructured.ingest.interfaces import PartitionConfig, ProcessorConfig, ReadConfig
from unstructured.ingest.runner import NotionRunner
if __name__ == "__main__":
notion(
verbose=True,
read_config=ReadConfig(),
partition_config=PartitionConfig(
runner = NotionRunner(
processor_config=ProcessorConfig(
verbose=True,
output_dir="notion-ingest-output",
num_processes=2,
),
read_config=ReadConfig(),
partition_config=PartitionConfig(),
)
runner.run(
api_key="POPULATE API KEY",
page_ids=["LIST", "OF", "PAGE", "IDS"],
database_ids=["LIST", "OF", "DATABASE", "IDS"],
@ -75,19 +80,23 @@ You can also use upstream connectors with the ``unstructured`` API. For this you
import os
from unstructured.ingest.interfaces import PartitionConfig, ReadConfig
from unstructured.ingest.runner.notion import notion
from unstructured.ingest.interfaces import PartitionConfig, ProcessorConfig, ReadConfig
from unstructured.ingest.runner import NotionRunner
if __name__ == "__main__":
notion(
verbose=True,
read_config=ReadConfig(),
partition_config=PartitionConfig(
runner = NotionRunner(
processor_config=ProcessorConfig(
verbose=True,
output_dir="notion-ingest-output",
num_processes=2,
),
read_config=ReadConfig(),
partition_config=PartitionConfig(
partition_by_api=True,
api_key=os.getenv("UNSTRUCTURED_API_KEY"),
),
)
runner.run(
api_key="POPULATE API KEY",
page_ids=["LIST", "OF", "PAGE", "IDS"],
database_ids=["LIST", "OF", "DATABASE", "IDS"],

View File

@ -33,17 +33,22 @@ Run Locally
.. code:: python
from unstructured.ingest.interfaces import PartitionConfig, ReadConfig
from unstructured.ingest.runner.onedrive import onedrive
import os
from unstructured.ingest.interfaces import PartitionConfig, ProcessorConfig, ReadConfig
from unstructured.ingest.runner import OneDriveRunner
if __name__ == "__main__":
onedrive(
verbose=True,
read_config=ReadConfig(),
partition_config=PartitionConfig(
runner = OneDriveRunner(
processor_config=ProcessorConfig(
verbose=True,
output_dir="onedrive-ingest-output",
num_processes=2,
),
read_config=ReadConfig(),
partition_config=PartitionConfig(),
)
runner.run(
client_id="<Azure AD app client-id>",
client_cred="<Azure AD app client-secret>",
authority_url="<Authority URL, default is https://login.microsoftonline.com>",
@ -84,19 +89,23 @@ You can also use upstream connectors with the ``unstructured`` API. For this you
import os
from unstructured.ingest.interfaces import PartitionConfig, ReadConfig
from unstructured.ingest.runner.onedrive import onedrive
from unstructured.ingest.interfaces import PartitionConfig, ProcessorConfig, ReadConfig
from unstructured.ingest.runner import OneDriveRunner
if __name__ == "__main__":
onedrive(
verbose=True,
read_config=ReadConfig(),
partition_config=PartitionConfig(
runner = OneDriveRunner(
processor_config=ProcessorConfig(
verbose=True,
output_dir="onedrive-ingest-output",
num_processes=2,
),
read_config=ReadConfig(),
partition_config=PartitionConfig(
partition_by_api=True,
api_key=os.getenv("UNSTRUCTURED_API_KEY"),
),
)
runner.run(
client_id="<Azure AD app client-id>",
client_cred="<Azure AD app client-secret>",
authority_url="<Authority URL, default is https://login.microsoftonline.com>",

View File

@ -35,17 +35,20 @@ Run Locally
import os
from unstructured.ingest.interfaces import PartitionConfig, ReadConfig
from unstructured.ingest.runner.outlook import outlook
from unstructured.ingest.interfaces import PartitionConfig, ProcessorConfig, ReadConfig
from unstructured.ingest.runner import OutlookRunner
if __name__ == "__main__":
outlook(
verbose=True,
read_config=ReadConfig(),
partition_config=PartitionConfig(
runner = OutlookRunner(
processor_config=ProcessorConfig(
verbose=True,
output_dir="outlook-output",
num_processes=2,
),
read_config=ReadConfig(),
partition_config=PartitionConfig(),
)
runner.run(
client_id=os.getenv("MS_CLIENT_ID"),
client_cred=os.getenv("MS_CLIENT_CRED"),
tenant=os.getenv("MS_TENANT_ID"),
@ -81,19 +84,23 @@ You can also use upstream connectors with the ``unstructured`` API. For this you
import os
from unstructured.ingest.interfaces import PartitionConfig, ReadConfig
from unstructured.ingest.runner.outlook import outlook
from unstructured.ingest.interfaces import PartitionConfig, ProcessorConfig, ReadConfig
from unstructured.ingest.runner import OutlookRunner
if __name__ == "__main__":
outlook(
verbose=True,
read_config=ReadConfig(),
partition_config=PartitionConfig(
runner = OutlookRunner(
processor_config=ProcessorConfig(
verbose=True,
output_dir="outlook-output",
num_processes=2,
),
read_config=ReadConfig(),
partition_config=PartitionConfig(
partition_by_api=True,
api_key=os.getenv("UNSTRUCTURED_API_KEY"),
),
)
runner.run(
client_id=os.getenv("MS_CLIENT_ID"),
client_cred=os.getenv("MS_CLIENT_CRED"),
tenant=os.getenv("MS_TENANT_ID"),

View File

@ -33,17 +33,22 @@ Run Locally
.. code:: python
from unstructured.ingest.interfaces import PartitionConfig, ReadConfig
from unstructured.ingest.runner.reddit import reddit
import os
from unstructured.ingest.interfaces import PartitionConfig, ProcessorConfig, ReadConfig
from unstructured.ingest.runner import RedditRunner
if __name__ == "__main__":
reddit(
verbose=True,
read_config=ReadConfig(),
partition_config=PartitionConfig(
runner = RedditRunner(
processor_config=ProcessorConfig(
verbose=True,
output_dir="reddit-ingest-output",
num_processes=2,
),
read_config=ReadConfig(),
partition_config=PartitionConfig(),
)
runner.run(
subreddit_name="machinelearning",
client_id="<client id here>",
client_secret="<client secret here>",
@ -83,19 +88,23 @@ You can also use upstream connectors with the ``unstructured`` API. For this you
import os
from unstructured.ingest.interfaces import PartitionConfig, ReadConfig
from unstructured.ingest.runner.reddit import reddit
from unstructured.ingest.interfaces import PartitionConfig, ProcessorConfig, ReadConfig
from unstructured.ingest.runner import RedditRunner
if __name__ == "__main__":
reddit(
verbose=True,
read_config=ReadConfig(),
partition_config=PartitionConfig(
runner = RedditRunner(
processor_config=ProcessorConfig(
verbose=True,
output_dir="reddit-ingest-output",
num_processes=2,
),
read_config=ReadConfig(),
partition_config=PartitionConfig(
partition_by_api=True,
api_key=os.getenv("UNSTRUCTURED_API_KEY"),
),
)
runner.run(
subreddit_name="machinelearning",
client_id="<client id here>",
client_secret="<client secret here>",

View File

@ -28,17 +28,22 @@ Run Locally
.. code:: python
from unstructured.ingest.interfaces import PartitionConfig, ReadConfig
from unstructured.ingest.runner.s3 import s3
import os
from unstructured.ingest.interfaces import PartitionConfig, ProcessorConfig, ReadConfig
from unstructured.ingest.runner import S3Runner
if __name__ == "__main__":
s3(
verbose=True,
read_config=ReadConfig(),
partition_config=PartitionConfig(
runner = S3Runner(
processor_config=ProcessorConfig(
verbose=True,
output_dir="s3-small-batch-output",
num_processes=2,
),
read_config=ReadConfig(),
partition_config=PartitionConfig(),
)
runner.run(
remote_url="s3://utic-dev-tech-fixtures/small-pdf-set/",
anonymous=True,
)
@ -69,19 +74,23 @@ You can also use upstream connectors with the ``unstructured`` API. For this you
import os
from unstructured.ingest.interfaces import PartitionConfig, ReadConfig
from unstructured.ingest.runner.s3 import s3
from unstructured.ingest.interfaces import PartitionConfig, ProcessorConfig, ReadConfig
from unstructured.ingest.runner import S3Runner
if __name__ == "__main__":
s3(
verbose=True,
read_config=ReadConfig(),
partition_config=PartitionConfig(
runner = S3Runner(
processor_config=ProcessorConfig(
verbose=True,
output_dir="s3-small-batch-output",
num_processes=2,
),
read_config=ReadConfig(),
partition_config=PartitionConfig(
partition_by_api=True,
api_key=os.getenv("UNSTRUCTURED_API_KEY"),
),
)
runner.run(
remote_url="s3://utic-dev-tech-fixtures/small-pdf-set/",
anonymous=True,
)

View File

@ -34,17 +34,20 @@ Run Locally
import os
from unstructured.ingest.interfaces import PartitionConfig, ReadConfig
from unstructured.ingest.runner.salesforce import salesforce
from unstructured.ingest.interfaces import PartitionConfig, ProcessorConfig, ReadConfig
from unstructured.ingest.runner import SalesforceRunner
if __name__ == "__main__":
salesforce(
verbose=True,
read_config=ReadConfig(),
partition_config=PartitionConfig(
runner = SalesforceRunner(
processor_config=ProcessorConfig(
verbose=True,
output_dir="salesforce-output",
num_processes=2,
),
read_config=ReadConfig(),
partition_config=PartitionConfig(),
)
runner.run(
username=os.getenv("SALESFORCE_USERNAME"),
consumer_key=os.getenv("SALESFORCE_CONSUMER_KEY"),
private_key_path=os.getenv("SALESFORCE_PRIVATE_KEY_PATH"),
@ -82,19 +85,23 @@ You can also use upstream connectors with the ``unstructured`` API. For this you
import os
from unstructured.ingest.interfaces import PartitionConfig, ReadConfig
from unstructured.ingest.runner.salesforce import salesforce
from unstructured.ingest.interfaces import PartitionConfig, ProcessorConfig, ReadConfig
from unstructured.ingest.runner import SalesforceRunner
if __name__ == "__main__":
salesforce(
verbose=True,
read_config=ReadConfig(),
partition_config=PartitionConfig(
runner = SalesforceRunner(
processor_config=ProcessorConfig(
verbose=True,
output_dir="salesforce-output",
num_processes=2,
),
read_config=ReadConfig(),
partition_config=PartitionConfig(
partition_by_api=True,
api_key=os.getenv("UNSTRUCTURED_API_KEY"),
),
)
runner.run(
username=os.getenv("SALESFORCE_USERNAME"),
consumer_key=os.getenv("SALESFORCE_CONSUMER_KEY"),
private_key_path=os.getenv("SALESFORCE_PRIVATE_KEY_PATH"),

View File

@ -35,17 +35,22 @@ Run Locally
.. code:: python
from unstructured.ingest.interfaces import PartitionConfig, ReadConfig
from unstructured.ingest.runner.sharepoint import sharepoint
import os
from unstructured.ingest.interfaces import PartitionConfig, ProcessorConfig, ReadConfig
from unstructured.ingest.runner import SharePointRunner
if __name__ == "__main__":
sharepoint(
verbose=True,
read_config=ReadConfig(),
partition_config=PartitionConfig(
runner = SharePointRunner(
processor_config=ProcessorConfig(
verbose=True,
output_dir="sharepoint-ingest-output",
num_processes=2,
),
read_config=ReadConfig(),
partition_config=PartitionConfig(),
)
runner.run(
client_id="<Microsoft Sharepoint app client-id>",
client_cred="<Microsoft Sharepoint app client-secret>",
site="<e.g https://contoso.sharepoint.com to process all sites within tenant>",
@ -56,7 +61,6 @@ Run Locally
# Flag to process only files within the site(s)
files_only=True,
path="Shared Documents",
recursive=False,
)
Run via the API
@ -92,19 +96,23 @@ You can also use upstream connectors with the ``unstructured`` API. For this you
import os
from unstructured.ingest.interfaces import PartitionConfig, ReadConfig
from unstructured.ingest.runner.sharepoint import sharepoint
from unstructured.ingest.interfaces import PartitionConfig, ProcessorConfig, ReadConfig
from unstructured.ingest.runner import SharePointRunner
if __name__ == "__main__":
sharepoint(
verbose=True,
read_config=ReadConfig(),
partition_config=PartitionConfig(
runner = SharePointRunner(
processor_config=ProcessorConfig(
verbose=True,
output_dir="sharepoint-ingest-output",
num_processes=2,
),
read_config=ReadConfig(),
partition_config=PartitionConfig(
partition_by_api=True,
api_key=os.getenv("UNSTRUCTURED_API_KEY"),
),
)
runner.run(
client_id="<Microsoft Sharepoint app client-id>",
client_cred="<Microsoft Sharepoint app client-secret>",
site="<e.g https://contoso.sharepoint.com to process all sites within tenant>",
@ -115,7 +123,6 @@ You can also use upstream connectors with the ``unstructured`` API. For this you
# Flag to process only files within the site(s)
files_only=True,
path="Shared Documents",
recursive=False,
)
Additionally, you will need to pass the ``--partition-endpoint`` if you're running the API locally. You can find more information about the ``unstructured`` API `here <https://github.com/Unstructured-IO/unstructured-api>`_.

View File

@ -30,17 +30,22 @@ Run Locally
.. code:: python
from unstructured.ingest.interfaces import PartitionConfig, ReadConfig
from unstructured.ingest.runner.slack import slack
import os
from unstructured.ingest.interfaces import PartitionConfig, ProcessorConfig, ReadConfig
from unstructured.ingest.runner import SlackRunner
if __name__ == "__main__":
slack(
verbose=True,
read_config=ReadConfig(),
partition_config=PartitionConfig(
runner = SlackRunner(
processor_config=ProcessorConfig(
verbose=True,
output_dir="slack-ingest-download",
num_processes=2,
),
read_config=ReadConfig(),
partition_config=PartitionConfig(),
)
runner.run(
channels=["12345678"],
token="12345678",
start_date="2023-04-01T01:00:00-08:00",
@ -75,19 +80,23 @@ You can also use upstream connectors with the ``unstructured`` API. For this you
import os
from unstructured.ingest.interfaces import PartitionConfig, ReadConfig
from unstructured.ingest.runner.slack import slack
from unstructured.ingest.interfaces import PartitionConfig, ProcessorConfig, ReadConfig
from unstructured.ingest.runner import SlackRunner
if __name__ == "__main__":
slack(
verbose=True,
read_config=ReadConfig(),
partition_config=PartitionConfig(
runner = SlackRunner(
processor_config=ProcessorConfig(
verbose=True,
output_dir="slack-ingest-download",
num_processes=2,
),
read_config=ReadConfig(),
partition_config=PartitionConfig(
partition_by_api=True,
api_key=os.getenv("UNSTRUCTURED_API_KEY"),
),
)
runner.run(
channels=["12345678"],
token="12345678",
start_date="2023-04-01T01:00:00-08:00",

View File

@ -28,18 +28,22 @@ Run Locally
.. code:: python
from unstructured.ingest.runner.wikipedia import wikipedia
from unstructured.ingest.interfaces import ReadConfig, PartitionConfig
import os
from unstructured.ingest.interfaces import PartitionConfig, ProcessorConfig, ReadConfig
from unstructured.ingest.runner import WikipediaRunner
if __name__ == "__main__":
wikipedia(
verbose=True,
read_config=ReadConfig(),
partition_config=PartitionConfig(
runner = WikipediaRunner(
processor_config=ProcessorConfig(
verbose=True,
output_dir="wikipedia-ingest-output",
num_processes=2
num_processes=2,
),
read_config=ReadConfig(),
partition_config=PartitionConfig(),
)
runner.run(
page_title="Open Source Software",
auto_suggest=False,
)
@ -70,19 +74,23 @@ You can also use upstream connectors with the ``unstructured`` API. For this you
import os
from unstructured.ingest.interfaces import PartitionConfig, ReadConfig
from unstructured.ingest.runner.wikipedia import wikipedia
from unstructured.ingest.interfaces import PartitionConfig, ProcessorConfig, ReadConfig
from unstructured.ingest.runner import WikipediaRunner
if __name__ == "__main__":
wikipedia(
verbose=True,
read_config=ReadConfig(),
partition_config=PartitionConfig(
runner = WikipediaRunner(
processor_config=ProcessorConfig(
verbose=True,
output_dir="wikipedia-ingest-output",
num_processes=2,
),
read_config=ReadConfig(),
partition_config=PartitionConfig(
partition_by_api=True,
api_key=os.getenv("UNSTRUCTURED_API_KEY"),
),
)
runner.run(
page_title="Open Source Software",
auto_suggest=False,
)

View File

@ -1 +1 @@
__version__ = "0.10.24" # pragma: no cover
__version__ = "0.10.25-dev0" # pragma: no cover

View File

@ -36,14 +36,16 @@ just execute `unstructured/ingest/main.py`, e.g.:
--output-dir s3-small-batch-output \
--num-processes 2
## Adding Data Connectors
## Adding Source Data Connectors
To add a connector, refer to [unstructured/ingest/connector/github.py](unstructured/ingest/connector/github.py) as example that implements the three relevant abstract base classes.
To add a connector, refer to [unstructured/ingest/connector/github.py](unstructured/ingest/connector/github.py) as an example that implements the three relevant abstract base classes.
If the connector has an available `fsspec` implementation, then refer to [unstructured/ingest/connector/s3.py](unstructured/ingest/connector/s3.py).
Then, update [unstructured/ingest/main.py/cli](unstructured/ingest/cli) to add a subcommand associated with the connector, and hook it up to the parent group.
Add an implementation of `BaseRunner` in the runner directory to connect the invocation of the CLI with the underlying connector created.
Create at least one folder [examples/ingest](examples/ingest) with an easily reproducible
script that shows the new connector in action.
@ -54,6 +56,16 @@ to be checked into CI under test_unstructured_ingest/expected-structured-output/
The `main.py` flags of --re-download/--no-re-download , --download-dir, --preserve-downloads, --structured-output-dir, and --reprocess are honored by the connector.
## Adding Destination Data Connectors
To add a destination connector, refer to [unstructured/ingest/connector/delta-table.py](unstructured/ingest/connector/delta-table.py) as an example, which extends the `BaseDestinationConnector`, and the `WriteConfig`. It also shows how an existing data provider can be used for both a source and destination connector.
Similar to the runner used to connect source connectors with the CLI, destination connectors require an entry in the writer map defined in [unstructured/ingest/runner/writers.py](unstructured/ingest/runner/writers.py). This allows any source connector to use any destination connector.
Regarding the entry in the CLI, destination connectors are exposed as a subcommand that gets added to each source connector parent command. Special care needs to be taken here to not break the code being run by the source connector. Take a look at how the base runner class is dynamically pulled using the name of the parent CLI command in [unstructured/ingest/cli/cmds/delta_table.py](unstructured/ingest/cli/cmds/delta_table.py).
Similar tests and examples should be added to demonstrate/validate the use of the destination connector similar to the steps laid out for a source connector.
### The checklist:
In checklist form, the above steps are summarized as:
@ -76,11 +88,11 @@ In checklist form, the above steps are summarized as:
- [ ] The added dependencies should be imported at runtime when the new connector is invoked, rather than as top-level imports.
- [ ] Add the decorator `unstructured.utils.requires_dependencies` on top of each class instance or function that uses those connector-specific dependencies e.g. for `GitHubConnector` should look like `@requires_dependencies(dependencies=["github"], extras="github")`
- [ ] Run `make tidy` and `make check` to ensure linting checks pass.
- [ ] Update ingest documentation [here](https://github.com/Unstructured-IO/unstructured/tree/eb8ce8913729826b62fd4e1224f70d67c5289b9d/docs/source)
- [ ] Update ingest documentation [here](https://github.com/Unstructured-IO/unstructured/tree/main/docs/source)
- [ ] For team members that are developing in the original repository:
- [ ] If there are secret variables created for the connector tests, make sure to:
- [ ] add the secrets into Github (contact someone with access)
- [ ] include the secret variables in [`ci.yml`](https://github.com/Unstructured-IO/unstructured/blob/eb8ce8913729826b62fd4e1224f70d67c5289b9d/.github/workflows/ci.yml) and [`ingest-test-fixtures-update-pr.yml`](https://github.com/Unstructured-IO/unstructured/blob/eb8ce8913729826b62fd4e1224f70d67c5289b9d/.github/workflows/ingest-test-fixtures-update-pr.yml)
- [ ] include the secret variables in [`ci.yml`](https://github.com/Unstructured-IO/unstructured/blob/main/.github/workflows/ci.yml) and [`ingest-test-fixtures-update-pr.yml`](https://github.com/Unstructured-IO/unstructured/blob/main/.github/workflows/ingest-test-fixtures-update-pr.yml)
- [ ] add a make install line in the workflow configurations to be able to provide the workflow machine with the required dependencies on the connector while testing
- [ ] Whenever necessary, use the [ingest update test fixtures](https://github.com/Unstructured-IO/unstructured/actions/workflows/ingest-test-fixtures-update-pr.yml) workflow to update the test fixtures.
- [ ] Honors the conventions of `BaseConnectorConfig` defined in [unstructured/ingest/interfaces.py](unstructured/ingest/interfaces.py) which is passed through [the CLI](unstructured/ingest/main.py):
@ -95,18 +107,42 @@ In checklist form, the above steps are summarized as:
`unstructured/ingest/main.py` is the entrypoint for the `unstructured-ingest` cli. It calls the cli Command as fetched from `cli.py` `get_cmd()`.
`get_cmd()` aggregates all subcommands (one per connector) as defined in the cli.cmd module. Each of these per-connector commands define the connector specific options and import the relevant common options. They call out to the corollary cli.runner.[CONNECTOR].py module.
The ingest directory is broken up in such a way that most of the code can be used with or without invoking the CLI itself:
The runner is a vanilla (not Click wrapped) Python function which also explicitly exposes the connector specific arguments. It instantiates the connector with aggregated options / configs and passes it in call to `process_documents()` in `processor.py`.
* **Connector:** This houses the main code that is responsible for reaching out to external data providers and pulling down the data (i.e. S3, Azure, etc)
* **Runner:** This serves as the interface between the CLI specific commands and running the connector code. A base runner class exists that defines much of the common functionality across all connectors and allowed for typed methods to be defined to explicitly connect the CLI command to the specific connector.
* **CLI:** This is where the `Click` python library is introduced to create the cli bindings that a user interacts with then invoking the CLI directly. Many of the common options across commands are abstracted away and add options dynamically to click commands.
![unstructured ingest cli diagram](img/unstructured_ingest_cli_diagram.jpg)
The ingest flow is similar to an ETL pipeline that gets defined at runtime based on user input:
Given an instance of BaseConnector with a reference to its ConnectorConfig (BaseConnectorConfig and StandardConnectorConfig) and set of processing parameters, `process_documents()` instantiates the Processor class and calls its `run()` method.
![unstructured ingest cli diagram](img/unstructured_ingest_cli_pipeline_diagram.png)
The Processor class (operating in the Main Process) calls to the connector to fetch `get_ingest_docs()`, a list of lazy download IngestDocs, each a skinny serializable object with connection config. These IngestDocs are filtered (where output results already exist locally) and passed to a multiprocessing Pool. Each subprocess in the pool then operates as a Worker Process. The Worker Process first initializes a logger, since it is operating in its own spawn of the Python interpreter. It then calls the `process_document()` function in `doc_processor.generalized.py`.
Each step in the pipeline caches the results in a default location if one is not provided to it. This allows for the pipeline
to pick up where it ended if an error occurred before it finished without having to recompute everything that ran successfully.
It uses a hash of the parameters passed in for each step along with the previous step to know if the results it already has are
still valid or should be recomputed regardless of them existing already. This allows you to change parameters associated with a
step in the tail end of the pipeline and it only recomputes from there.
The `process_document()` function is given an IngestDoc, which has a reference to the respective ConnectorConfigs. Also defined is a global session_handler (of type BaseSessionHandler). This contains any session/connection relevant data for the IngestDoc that can be re-used when processing sibling IngestDocs from the same BaseConnector / config. If the value for the session_handle isn't assigned, a session_handle is created from the IngestDoc and assigned to the global variable, otherwise the existing global variable value ("session") is leveraged to process the IngestDoc. The function proceeds to call the IngestDoc's `get_file()`, `process_file()`, `write_result()`, and `clean_up()` methods.
**Multiprocessing:** One of the options for the pipeline is how many processes to use. Not all steps support multiprocessing, but if they do, a multiprocessing Pool is used to speed up the process. For debugging purposes, if a single process is set, multiprocessing Pool isn't used at all.
Once all multiprocessing subprocesses complete a final call to the BaseConnector `clean_up()` method is made.
While all the configurations are added to a single Click command when the CLI is invoked as options, many of these are bundled together based on a particular step in the pipeline. A `BaseConfig`
is extended in the root interfaces file and then that can be extended once again in the cli-specific interfaces file which adds a function on how the fields in the base config should be mapped to `Click` options.
### Configs
* `PartitionConfig`: Data associated with running the partitioning over the files pulled down via the source connector.
* `ProcessorConfig`: Data around the process as a whole, such as number of processes to use when running, where to store the final result of the pipeline, and if an error should be raised if a single doc fails. By default, the pipeline will continue with that it can, so if a doc fails out of many, an error will be logged and the rest will continue.
* `ReadConfig`: Data associated with pulling the data from the source data provider, such as if it should be redownloaded, regardless of the files already existing.
* `EmbeddingConfig`: Data associated with running an optional embedder on the data, which adds a new field to the output json for each element with it's associated embeddings vector.
* `ChunkingConfig`: Data associated with running an optional chunker over the partitioned data.
* `PermissionsConfig`: Data associated with pulling down permissions data (i.e. RBAC). This is an optional feature and if enabled, will append the information pulled down to the metadata associated with an element.
* `WriteConfig`: Any specific data needed to write to a destination connector. This does not have to be used if not needed.
![unstructured ingest processing diagram](img/unstructured_ingest_processing_diagram.jpg)
For the flow of the pipeline, the only required steps are:
* **Doc Factory:** This creates instances of `BaseIngestDoc` which provide references to a file on the source data provider without downloading anything yet.
* **Source Node:** This is responsible for downloading and content and producing a representation of that content suitable for partitioning.
* **Partitioner:** Responsible for running partition over the content produced by the previous source node.
Optional Steps:
* **Reformat Nodes:** Any number of reformat nodes can be set to modify the partitioned content. Currently chunking and embedding are supported.
* **Write Node:** If set, write the results to a destination via a destination connector.
Because there can be any number of reformat nodes, the final destination is not deterministic, so an extra step is added at the end of all reformat nodes to copy the final result to the location the user expects it to be when the pipeline ends.

Binary file not shown.

Before

Width:  |  Height:  |  Size: 37 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 90 KiB

Binary file not shown.

Before

Width:  |  Height:  |  Size: 166 KiB

View File

@ -9,9 +9,11 @@ from unstructured.ingest.runner.utils import update_download_dir_remote_url
class AzureRunner(FsspecBaseRunner):
def run(
self,
account_name: t.Optional[str],
account_key: t.Optional[str],
connection_string: t.Optional[str],
remote_url: str,
account_name: t.Optional[str] = None,
account_key: t.Optional[str] = None,
connection_string: t.Optional[str] = None,
recursive: bool = False,
**kwargs,
):
ingest_log_streaming_init(logging.DEBUG if self.processor_config.verbose else logging.INFO)

View File

@ -10,9 +10,9 @@ from unstructured.ingest.runner.utils import update_download_dir_hash
class BiomedRunner(Runner):
def run(
self,
max_retries: int,
max_request_time: int,
decay: float,
max_retries: int = 5,
max_request_time: int = 45,
decay: float = 0.3,
path: t.Optional[str] = None,
api_id: t.Optional[str] = None,
api_from: t.Optional[str] = None,

View File

@ -13,8 +13,8 @@ class ConfluenceRunner(Runner):
url: str,
user_email: str,
api_token: str,
max_num_of_spaces: int,
max_num_of_docs_from_each_space: int,
max_num_of_spaces: int = 500,
max_num_of_docs_from_each_space: int = 100,
spaces: t.Optional[t.List[str]] = None,
**kwargs,
):