feat: airtable connector (#1012)

* add the first version of airtable connector

* change imports as inline to fail gracefully in case of lacking dependency

* parse tables as csv rather than plain text

* add relevant logic to be able to use --airtable-list-of-paths

* add script for creation of reseources for testing, add test script (large) for testing with a large number of tables to validate scroll functionality, update test script (diff) based on the new settings

* fix ingest test names

* add scripts for the large table test

* remove large table test from diff test

* make base and table ids explicit

* add and remove comments

* use -ne instead of !=

* update code based on the recent ingest refactor, update changelog and version

* shellcheck fix

* update comments

* update check-num-rows-and-columns-output error message

Co-authored-by: ryannikolaidis <1208590+ryannikolaidis@users.noreply.github.com>

* update help comments

* update help comments

* update help comments

* update workflows to set auth tokens and to run make install

* add comments on create_scale_test_components

* separate component ids from the test script, add comments to document test component creation

* add LARGE_BASE test, implement LARGE_BASE component creation, replace component id

* shellcheck fixes

* shellcheck fixes

* update docs

* update comment

* bump version

* add wrongly deleted file

* sort columns before saving to process

* Update ingest test fixtures (#1098)

Co-authored-by: ahmetmeleq <ahmetmeleq@users.noreply.github.com>

---------

Co-authored-by: ryannikolaidis <1208590+ryannikolaidis@users.noreply.github.com>
Co-authored-by: ahmetmeleq <ahmetmeleq@users.noreply.github.com>
This commit is contained in:
Ahmet Melek 2023-08-11 22:02:51 +03:00 committed by GitHub
parent fa5a3dbd81
commit 627f78c16f
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
28 changed files with 771 additions and 2 deletions

View File

@ -186,6 +186,7 @@ jobs:
PYTHONPATH=. pytest test_unstructured_ingest/unit
- name: Test (end-to-end)
env:
AIRTABLE_PERSONAL_ACCESS_TOKEN: ${{ secrets.AIRTABLE_PERSONAL_ACCESS_TOKEN }}
BOX_APP_CONFIG: ${{ secrets.BOX_APP_CONFIG }}
CONFLUENCE_API_TOKEN: ${{ secrets.CONFLUENCE_API_TOKEN }}
CONFLUENCE_USER_EMAIL: ${{ secrets.CONFLUENCE_USER_EMAIL }}
@ -214,6 +215,7 @@ jobs:
sudo apt-get install -y tesseract-ocr-kor
tesseract --version
make install-ingest-s3
make install-ingest-airtable
make install-ingest-azure
make install-ingest-box
make install-ingest-confluence

View File

@ -58,6 +58,7 @@ jobs:
make install-ci
- name: Update test fixtures
env:
AIRTABLE_PERSONAL_ACCESS_TOKEN: ${{ secrets.AIRTABLE_PERSONAL_ACCESS_TOKEN }}
BOX_APP_CONFIG: ${{ secrets.BOX_APP_CONFIG }}
CONFLUENCE_API_TOKEN: ${{ secrets.CONFLUENCE_API_TOKEN }}
CONFLUENCE_USER_EMAIL: ${{ secrets.CONFLUENCE_USER_EMAIL }}
@ -88,6 +89,7 @@ jobs:
tesseract --version
make install-ingest-s3
make install-ingest-azure
make install-ingest-airtable
make install-ingest-box
make install-ingest-confluence
make install-ingest-discord

View File

@ -1,4 +1,4 @@
## 0.9.3-dev0
## 0.9.3-dev1
### Enhancements
@ -9,10 +9,13 @@
### Features
* Add Airtable Connector to be able to pull views/tables/bases from an Airtable organization
### Fixes
## 0.9.2
### Enhancements
* Update table extraction section in API documentation to sync with change in Prod API

View File

@ -157,6 +157,10 @@ install-ingest-elasticsearch:
install-ingest-confluence:
python3 -m pip install -r requirements/ingest-confluence.txt
.PHONY: install-ingest-airtable
install-ingest-airtable:
python3 -m pip install -r requirements/ingest-airtable.txt
.PHONY: install-unstructured-inference
install-unstructured-inference:
python3 -m pip install -r requirements/local-inference.txt
@ -210,6 +214,7 @@ pip-compile:
pip-compile --upgrade requirements/ingest-onedrive.in
pip-compile --upgrade requirements/ingest-outlook.in
pip-compile --upgrade requirements/ingest-confluence.in
pip-compile --upgrade requirements/ingest-airtable.in
## install-project-local: install unstructured into your local python environment
.PHONY: install-project-local

View File

@ -21,6 +21,11 @@ You can also use connectors with the ``unstructured`` API. For this you'll need
NOTE: Keep in mind that you will need to have all the appropriate extras and dependencies for the file types of the documents contained in your data storage platform if you're running this locally. You can find more information about this in the `installation guide <https://unstructured-io.github.io/unstructured/installing.html>`_.
``Airtable Connector``
--------------------
You can batch process documents stored in your Airtable using the `Airtable Connector <https://github.com/Unstructured-IO/unstructured/blob/main/unstructured/ingest/connector/airtable.py>`_. You can find an example of how to use it `here <https://github.com/Unstructured-IO/unstructured/blob/f5541c7b0b1e2fc47ec88da5e02080d60e1441e2/examples/ingest/airtable/airtable.sh>`_.
To install all dependencies for this connector run: ``pip install "unstructured[azure]"``
``Azure Connector``
--------------------

View File

@ -0,0 +1,51 @@
#!/usr/bin/env bash
# Processes all the documents in all bases (in all workspaces) within an Airtable org,
# using the `unstructured` library.
# Structured outputs are stored in airtable-ingest-output
SCRIPT_DIR=$(dirname "$(realpath "$0")")
cd "$SCRIPT_DIR"/../../.. || exit 1
# Required arguments:
# --personal-access-token
# --> Personal access token to authenticate into Airtable.
# Check https://support.airtable.com/docs/creating-and-using-api-keys-and-access-tokens for more info.
# Optional arguments that you can use:
# --list-of-paths
# --> A list of paths that specify the locations to ingest data from within Airtable.
# If this argument is not set, the connector ingests all tables within each and every base.
# --list-of-paths: path1 path2 path3 ….
# path: base_id/table_id(optional)/view_id(optional)/
# To obtain (base, table, view) ids in bulk, check:
# https://airtable.com/developers/web/api/list-bases (base ids)
# https://airtable.com/developers/web/api/get-base-schema (table and view ids)
# https://pyairtable.readthedocs.io/en/latest/metadata.html (base, table and view ids)
# To obtain specific ids from Airtable UI, go to your workspace, and copy any
# relevant id from the URL structure:
# https://airtable.com/appAbcDeF1ghijKlm/tblABcdEfG1HIJkLm/viwABCDEfg6hijKLM
# appAbcDeF1ghijKlm -> base_id
# tblABcdEfG1HIJkLm -> table_id
# viwABCDEfg6hijKLM -> view_id
# You can also check: https://support.airtable.com/docs/finding-airtable-ids
# Here is an example for one --list-of-paths:
# base1/ → gets the entirety of all tables inside base1
# base1/table1 → gets all rows and columns within table1 in base1
# base1/table1/view1 → gets the rows and columns that are visible in view1 for the table1 in base1
# Examples to invalid paths:
# table1 → has to mention base to be valid
# base1/view1 → has to mention table to be valid
PYTHONPATH=. ./unstructured/ingest/main.py \
airtable \
--metadata-exclude filename,file_directory,metadata.data_source.date_processed \
--personal-access-token "$AIRTABLE_PERSONAL_ACCESS_TOKEN" \
--structured-output-dir airtable-ingest-output \
--num-processes 2 \
--reprocess

View File

@ -0,0 +1,3 @@
-c constraints.in
-c base.txt
pyairtable

View File

@ -0,0 +1,39 @@
#
# This file is autogenerated by pip-compile with Python 3.10
# by the following command:
#
# pip-compile requirements/ingest-airtable.in
#
certifi==2023.7.22
# via
# -c requirements/base.txt
# -c requirements/constraints.in
# requests
charset-normalizer==3.2.0
# via
# -c requirements/base.txt
# requests
idna==3.4
# via
# -c requirements/base.txt
# requests
inflection==0.5.1
# via pyairtable
pyairtable==2.0.0
# via -r requirements/ingest-airtable.in
pydantic==1.10.12
# via pyairtable
requests==2.31.0
# via
# -c requirements/base.txt
# pyairtable
typing-extensions==4.7.1
# via
# pyairtable
# pydantic
urllib3==1.26.16
# via
# -c requirements/base.txt
# -c requirements/constraints.in
# pyairtable
# requests

View File

@ -0,0 +1,32 @@
#!/usr/bin/env bash
# Components below are all created using Airtable UI, however, in case they need
# to be recreated, it is also possible to create them using the Web API.
# pyairtable does not yet support creating these components (bases, tables).
# For documentation on the Web API for creating bases, check:
# https://airtable.com/developers/web/api/create-base
# For creating lots of tables inside a base, check:
# create_scale_test_components.sh
LARGE_TABLE_BASE_ID="appQqieVsbxpwwD3i"
LARGE_TABLE_TABLE_ID="tbll85GCfxED1OrvC"
LARGE_BASE_BASE_ID="appjPRwoyawsapoGW"
LARGE_WORKSPACE_BASE_ID_1="appSSCNWuIMjzeraO"
LARGE_WORKSPACE_BASE_ID_2="appyvCsaHWn38RzFc"
LARGE_WORKSPACE_BASE_ID_3="appbd8fkBv3AXj0Ab"
LARGE_WORKSPACE_BASE_ID_4="appHEvCPnpfiAwjPE"
LARGE_WORKSPACE_BASE_ID_5="appL9ND7LVWaItAmC"
LARGE_WORKSPACE_BASE_ID_6="appOGnidMsh93yCQI"
LARGE_WORKSPACE_BASE_ID_7="apps71HjvZRRgqHkz"
LARGE_WORKSPACE_BASE_ID_8="appvDbw5f7jCQqdsr"
LARGE_WORKSPACE_BASE_ID_9="appGFdtbLmqf2k8Ly"
LARGE_WORKSPACE_BASE_ID_10="appTn61bfU8vCIkGf"
LARGE_WORKSPACE_BASE_ID_11="app1c4CtIQ4ZToHIR"
LARGE_WORKSPACE_BASE_ID_12="apphvDFg6OC7l1xwo"
# shellcheck disable=SC2034
LARGE_TEST_LIST_OF_PATHS="$LARGE_BASE_BASE_ID $LARGE_TABLE_BASE_ID $LARGE_WORKSPACE_BASE_ID_1 $LARGE_WORKSPACE_BASE_ID_2 $LARGE_WORKSPACE_BASE_ID_3 $LARGE_WORKSPACE_BASE_ID_4 $LARGE_WORKSPACE_BASE_ID_5 $LARGE_WORKSPACE_BASE_ID_6 $LARGE_WORKSPACE_BASE_ID_7 $LARGE_WORKSPACE_BASE_ID_8 $LARGE_WORKSPACE_BASE_ID_9 $LARGE_WORKSPACE_BASE_ID_10 $LARGE_WORKSPACE_BASE_ID_11 $LARGE_WORKSPACE_BASE_ID_12"
export LARGE_TABLE_BASE_ID
export LARGE_TABLE_TABLE_ID

View File

@ -0,0 +1,82 @@
import os
# import pyairtable as pyair
from pyairtable import Api
from unstructured.ingest.logger import logger
SCALE_TEST_NUMBER_OF_RECORDS = 20_000
# Access token that has read and write permissions for the respective workspace
token = os.environ["AIRTABLE_ACCESS_TOKEN_WRITE"]
# You can find the IDs below defined in component_ids.sh
# In case new ones are needed to be created, there's guidance below and in component_ids.sh.
# ID of a new base that is intended to contain one large table.
# The table will be filled using this python script.
# If the ID is not in the environment, it is possible to create a new base
# via the Airtable UI, and get the base ID from the URL structure.
# (https://support.airtable.com/docs/finding-airtable-ids)
large_table_base_id = os.environ["LARGE_TABLE_BASE_ID"]
# ID of the one table inside the base "large_table_base".
# The table is intended to be large, and will be filled using this python script.
# If the ID is not in the environment, it is possible to create a new table
# via the Airtable UI, and get the table ID from the URL structure.
# (https://support.airtable.com/docs/finding-airtable-ids)
large_table_table_id = os.environ["LARGE_TABLE_TABLE_ID"]
# ID of a base that is intended to contain lots of tables.
# large_base_base_id = os.environ["LARGE_BASE_BASE_ID"]
# Creating tables is not yet supported in pyairtable. Try Airtable Web API instead:
# https://airtable.com/developers/web/api/create-base"
def create_n_bases(api, number_of_bases):
raise NotImplementedError(
"Creating bases is not yet supported in pyairtable. \
Try Airtable Web API instead: \
https://airtable.com/developers/web/api/create-base",
)
# if len(pyair.metadata.get_api_bases(api)["bases"])>99:
# logger.warning("Airtable Org already has a high number of bases. \
# Aborting creation of new bases to avoid duplication and bloating.")
# return
number_of_bases
def create_n_tables(base, number_of_tables):
raise NotImplementedError(
"Creating tables is not yet supported in pyairtable. \
Try Airtable Web API instead: \
https://airtable.com/developers/web/api/create-table",
)
# if len(pyair.metadata.get_base_schema(base)["tables"])>99:
# logger.warning("Base already has a high number of tables. \
# Aborting creation of new tables to avoid duplication and bloating.")
# return
def create_n_records(table, number_of_records):
logger.warning(
"Fetching table records to count, before creation of new records.\
This should take around 1 second per 415 records.",
)
if len(table.all()) > SCALE_TEST_NUMBER_OF_RECORDS - 1:
logger.warning(
"Table already has a high number of records. \
Aborting creation of new records to avoid duplication and bloating.",
)
return
records = [{"Name": f"My Name is {i}"} for i in range(number_of_records)]
table.batch_create(records)
if __name__ == "__main__":
api = Api(token)
large_table = api.table(large_table_base_id, large_table_table_id)
logger.info("Creating records, this should take about 1 second per 40 records.")
create_n_records(large_table, SCALE_TEST_NUMBER_OF_RECORDS)

View File

@ -0,0 +1,15 @@
#!/usr/bin/env bash
# This scripts creates a large number of tables inside an Airtable base.
# shellcheck disable=SC2001,SC1091
source ./scripts/airtable-test-helpers/component_ids.sh
base_data='{"description": "Table-X of the test tables for the test LARGE_BASE.", "fields": [{"description": "Name of the row","name": "Name","type": "singleLineText"}],"name": "LARGE_BASE_TABLE_X"}'
for ((i=1; i<=100; i++)); do
item="$(echo "$base_data" | sed "s/X/$i/g")"
curl -X POST "https://api.airtable.com/v0/meta/bases/$LARGE_BASE_BASE_ID/tables" \
-H "Authorization: Bearer $AIRTABLE_ACCESS_TOKEN_WRITE2" \
-H "Content-Type: application/json" \
--data "$item"
done

View File

@ -0,0 +1,28 @@
import argparse
import json
from io import StringIO
import pandas as pd
def number_of_rows(file_path):
with open(file_path) as file:
data = json.load(file)
df = pd.read_csv(StringIO(data[0]["text"]))
return len(df)
if __name__ == "__main__":
parser = argparse.ArgumentParser(
description="Read Unstructured Ingest output file and print the number of rows",
)
parser.add_argument(
"--structured-output-file-path",
help="Path to Unstructured Ingest output file",
)
args = parser.parse_args()
output_path = args.structured_output_file_path
print(number_of_rows(output_path))

View File

@ -140,6 +140,7 @@ setup(
"onedrive": load_requirements("requirements/ingest-onedrive.in"),
"outlook": load_requirements("requirements/ingest-outlook.in"),
"confluence": load_requirements("requirements/ingest-confluence.in"),
"airtable": load_requirements("requirements/ingest-airtable.in"),
# Legacy extra requirements
"huggingface": load_requirements("requirements/huggingface.in"),
"local-inference": all_doc_reqs,

View File

@ -0,0 +1,22 @@
#!/usr/bin/env bash
# Description: Validate that the number of rows in the output dataframe is as expected.
#
# Arguments:
# - $1: The expected number of rows in the dataframe.
# - $2: The path for the structured output file.
SCRIPT_PATH="scripts/airtable-test-helpers/print_num_rows_df.py"
EXPECTED_ROWS=$1
OUTPUT_FILE_NAME=$2
# Run the Python script and capture its output
ROWS=$(python "$SCRIPT_PATH" --structured-output-file-path "$OUTPUT_FILE_NAME")
# Compare the actual output with the expected output
if [[ $ROWS -ne $EXPECTED_ROWS ]]; then
echo
echo "ERROR: $ROWS rows created. $EXPECTED_ROWS rows should have been created."
exit 1
fi

File diff suppressed because one or more lines are too long

File diff suppressed because one or more lines are too long

File diff suppressed because one or more lines are too long

File diff suppressed because one or more lines are too long

View File

@ -0,0 +1,33 @@
#!/usr/bin/env bash
set -e
# Description: This test checks if all the processed content is the same as the expected outputs.
# Also checks if a large table can be ingested properly.
SCRIPT_DIR=$(dirname "$(realpath "$0")")
cd "$SCRIPT_DIR"/.. || exit 1
OUTPUT_FOLDER_NAME=airtable-diff
OUTPUT_DIR=$SCRIPT_DIR/structured-output/$OUTPUT_FOLDER_NAME
DOWNLOAD_DIR=$SCRIPT_DIR/download/$OUTPUT_FOLDER_NAME
VARIED_DATA_BASE_ID="app5YQxSfp220fWtm"
VARIED_DATA_BASE_ID_2="appJ43QmP8I17zu88"
if [ -z "$AIRTABLE_PERSONAL_ACCESS_TOKEN" ]; then
echo "Skipping Airtable ingest test because the AIRTABLE_PERSONAL_ACCESS_TOKEN is not set."
exit 0
fi
PYTHONPATH=. ./unstructured/ingest/main.py \
airtable \
--download-dir "$DOWNLOAD_DIR" \
--personal-access-token "$AIRTABLE_PERSONAL_ACCESS_TOKEN" \
--list-of-paths "$VARIED_DATA_BASE_ID $VARIED_DATA_BASE_ID_2" \
--metadata-exclude filename,file_directory,metadata.data_source.date_processed,metadata.date,metadata.last_modified \
--num-processes 2 \
--preserve-downloads \
--reprocess \
--structured-output-dir "$OUTPUT_DIR"
sh "$SCRIPT_DIR"/check-diff-expected-output.sh $OUTPUT_FOLDER_NAME

View File

@ -0,0 +1,50 @@
#!/usr/bin/env bash
set -e
# Description: This test checks if the number of bases and tables processed are as expected.
# Each base shows up as a directory in the output folder, hence check-num-dirs-output.sh
# Each table shows up as a file in a base directory, hence check-num-files-output.sh
SCRIPT_DIR=$(dirname "$(realpath "$0")")
cd "$SCRIPT_DIR"/.. || exit 1
OUTPUT_FOLDER_NAME=airtable-large
OUTPUT_DIR=$SCRIPT_DIR/structured-output/$OUTPUT_FOLDER_NAME
DOWNLOAD_DIR=$SCRIPT_DIR/download/$OUTPUT_FOLDER_NAME
if [ -z "$AIRTABLE_PERSONAL_ACCESS_TOKEN" ]; then
echo "Skipping Airtable ingest test because the AIRTABLE_PERSONAL_ACCESS_TOKEN is not set."
exit 0
fi
# Provides component IDs such as LARGE_TEST_LIST_OF_PATHS,
# LARGE_TABLE_BASE_ID, LARGE_TABLE_TABLE_ID, and LARGE_BASE_BASE_ID
# shellcheck disable=SC1091
source ./scripts/airtable-test-helpers/component_ids.sh
PYTHONPATH=. ./unstructured/ingest/main.py \
airtable \
--download-dir "$DOWNLOAD_DIR" \
--personal-access-token "$AIRTABLE_PERSONAL_ACCESS_TOKEN" \
--list-of-paths "$LARGE_TEST_LIST_OF_PATHS" \
--metadata-exclude filename,file_directory,metadata.data_source.date_processed,metadata.date \
--num-processes 2 \
--preserve-downloads \
--reprocess \
--structured-output-dir "$OUTPUT_DIR"
# We are expecting fifteen directories: fourteen bases and the parent directory
sh "$SCRIPT_DIR"/check-num-dirs-output.sh 15 "$OUTPUT_FOLDER_NAME"
# We are expecting 101 files: 100 tables and the parent directory
sh "$SCRIPT_DIR"/check-num-files-output.sh 101 "$OUTPUT_FOLDER_NAME"/"$LARGE_BASE_BASE_ID"/
# Test on ingesting a large number of bases
for i in {1..12}; do
var="LARGE_WORKSPACE_BASE_ID_$i"
sh "$SCRIPT_DIR"/check-num-files-output.sh 12 "$OUTPUT_FOLDER_NAME"/"${!var}"
done
# Test on ingesting a table with lots of rows
sh "$SCRIPT_DIR"/check-num-rows-and-columns-output.sh 39999 "$OUTPUT_DIR"/"$LARGE_TABLE_BASE_ID"/"$LARGE_TABLE_TABLE_ID".json

View File

@ -28,6 +28,8 @@ export OMP_THREAD_LIMIT=1
./test_unstructured_ingest/test-ingest-elasticsearch.sh
#./test_unstructured_ingest/test-ingest-confluence-diff.sh
./test_unstructured_ingest/test-ingest-confluence-large.sh
./test_unstructured_ingest/test-ingest-airtable-diff.sh
./test_unstructured_ingest/test-ingest-airtable-large.sh
./test_unstructured_ingest/test-ingest-local-single-file.sh
./test_unstructured_ingest/test-ingest-local-single-file-with-encoding.sh
./test_unstructured_ingest/test-ingest-local-single-file-with-pdf-infer-table-structure.sh

View File

@ -1 +1 @@
__version__ = "0.9.3-dev0" # pragma: no cover
__version__ = "0.9.3-dev1" # pragma: no cover

View File

@ -31,6 +31,7 @@ subcommands = [
cli_cmds.elasticsearch,
cli_cmds.confluence,
cli_cmds.sharepoint,
cli_cmds.airtable,
]
for subcommand in subcommands:

View File

@ -1,3 +1,4 @@
from .airtable import get_cmd as airtable
from .azure import get_cmd as azure
from .biomed import get_cmd as biomed
from .box import get_cmd as box
@ -21,6 +22,7 @@ from .slack import get_cmd as slack
from .wikipedia import get_cmd as wikipedia
__all__ = [
"airtable",
"azure",
"biomed",
"box",

View File

@ -0,0 +1,76 @@
import logging
import click
from unstructured.ingest.cli.common import (
add_recursive_option,
add_shared_options,
log_options,
map_to_processor_config,
map_to_standard_config,
run_init_checks,
)
from unstructured.ingest.logger import ingest_log_streaming_init, logger
from unstructured.ingest.runner import airtable as airtable_fn
@click.command()
@click.option(
"--personal-access-token",
default=None,
help="Personal access token to authenticate into Airtable. Check: \
https://support.airtable.com/docs/creating-and-using-api-keys-and-access-tokens for more info",
)
@click.option(
"--list-of-paths",
default=None,
help="""A list of paths that specify the locations to ingest data from within Airtable.
If this argument is not set, the connector ingests all tables within each and every base.
--list-of-paths: path1 path2 path3 .
path: base_id/table_id(optional)/view_id(optional)/
To obtain (base, table, view) ids in bulk, check:
https://airtable.com/developers/web/api/list-bases (base ids)
https://airtable.com/developers/web/api/get-base-schema (table and view ids)
https://pyairtable.readthedocs.io/en/latest/metadata.html (base, table and view ids)
To obtain specific ids from Airtable UI, go to your workspace, and copy any
relevant id from the URL structure:
https://airtable.com/appAbcDeF1ghijKlm/tblABcdEfG1HIJkLm/viwABCDEfg6hijKLM
appAbcDeF1ghijKlm -> base_id
tblABcdEfG1HIJkLm -> table_id
viwABCDEfg6hijKLM -> view_id
You can also check: https://support.airtable.com/docs/finding-airtable-ids
Here is an example for one --list-of-paths:
base1/ gets the entirety of all tables inside base1
base1/table1 gets all rows and columns within table1 in base1
base1/table1/view1 gets the rows and columns that are
visible in view1 for the table1 in base1
Examples to invalid airtable_paths:
table1 has to mention base to be valid
base1/view1 has to mention table to be valid
""",
)
def airtable(**options):
verbose = options.get("verbose", False)
ingest_log_streaming_init(logging.DEBUG if verbose else logging.INFO)
log_options(options)
try:
run_init_checks(**options)
connector_config = map_to_standard_config(options)
processor_config = map_to_processor_config(options)
airtable_fn(connector_config=connector_config, processor_config=processor_config, **options)
except Exception as e:
logger.error(e, exc_info=True)
raise click.ClickException(str(e)) from e
def get_cmd() -> click.Command:
cmd = airtable
add_shared_options(cmd)
add_recursive_option(cmd)
return cmd

View File

@ -0,0 +1,222 @@
import os
from dataclasses import dataclass
from pathlib import Path
from typing import Optional
from unstructured.ingest.interfaces import (
BaseConnector,
BaseConnectorConfig,
BaseIngestDoc,
ConnectorCleanupMixin,
IngestDocCleanupMixin,
StandardConnectorConfig,
)
from unstructured.ingest.logger import logger
from unstructured.utils import requires_dependencies
@dataclass
class SimpleAirtableConfig(BaseConnectorConfig):
"""Connector config where:
auth_token is the authentication token to authenticate into Airtable.
Check https://support.airtable.com/docs/airtable-api-key-deprecation-notice
for more info on authentication.
"""
personal_access_token: str
list_of_paths: Optional[str]
@dataclass
class AirtableFileMeta:
"""Metadata specifying a table id, a base id which the table is stored in,
and an optional view id in case particular rows and fields are to be ingested"""
base_id: str
table_id: str
view_id: Optional[str] = None
@dataclass
class AirtableIngestDoc(IngestDocCleanupMixin, BaseIngestDoc):
"""Class encapsulating fetching a doc and writing processed results (but not
doing the processing).
Current implementation creates an Airtable connection object
to fetch each document, rather than creating a it for each thread.
"""
config: SimpleAirtableConfig
file_meta: AirtableFileMeta
@property
def filename(self):
return (
Path(self.standard_config.download_dir)
/ self.file_meta.base_id
/ f"{self.file_meta.table_id}.csv"
).resolve()
@property
def _output_filename(self):
"""Create output file path based on output directory, base id, and table id"""
output_file = f"{self.file_meta.table_id}.json"
return Path(self.standard_config.output_dir) / self.file_meta.base_id / output_file
@requires_dependencies(["pyairtable", "pandas"])
@BaseIngestDoc.skip_if_file_exists
def get_file(self):
logger.debug(f"Fetching {self} - PID: {os.getpid()}")
# TODO: instead of having a separate connection object for each doc,
# have a separate connection object for each process
import pandas as pd
from pyairtable import Api
self.api = Api(self.config.personal_access_token)
table = self.api.table(self.file_meta.base_id, self.file_meta.table_id)
df = pd.DataFrame.from_dict(
[row["fields"] for row in table.all(view=self.file_meta.view_id)],
).sort_index(axis=1)
self.document = df.to_csv()
self.filename.parent.mkdir(parents=True, exist_ok=True)
with open(self.filename, "w", encoding="utf8") as f:
f.write(self.document)
airtable_id_prefixes = ["app", "tbl", "viw"]
def raise_airtable_path_error(piece):
if any(piece[:3] == prefix for prefix in airtable_id_prefixes):
raise (
ValueError(
"Path components are not correctly ordered.\
Valid path structures: \
- base_id/table_id/view_id , \
- base_id/table_id, \
- base_id .\
It is also possible to leave --airtable-list-of-paths \
argument empty (this will ingest everything).",
)
)
else:
raise (
ValueError(
"""Path components are not valid Airtable ids.
base_id should look like: appAbcDeF1ghijKlm,
table_id should look like: tblAbcDeF1ghijKlm,
view_id should look like: viwAbcDeF1ghijKlm""",
)
)
def check_path_validity(path):
pieces = path.split("/")
assert (
1 <= len(pieces) <= 3
), "Path should be composed of between 1-3 \
components (base_id, table_id, view_id)."
for i, piece in enumerate(pieces):
try:
assert piece[:3] == airtable_id_prefixes[i]
except AssertionError:
raise_airtable_path_error(piece)
@dataclass
class AirtableConnector(ConnectorCleanupMixin, BaseConnector):
"""Fetches tables or views from an Airtable org."""
config: SimpleAirtableConfig
def __init__(
self,
standard_config: StandardConnectorConfig,
config: SimpleAirtableConfig,
):
super().__init__(standard_config, config)
@requires_dependencies(["pyairtable"])
def initialize(self):
from pyairtable import Api
self.base_ids_to_fetch_tables_from = []
if self.config.list_of_paths:
self.list_of_paths = self.config.list_of_paths.split()
self.api = Api(self.config.personal_access_token)
@requires_dependencies(["pyairtable"])
def use_all_bases(self):
from pyairtable.metadata import get_api_bases
self.base_ids_to_fetch_tables_from = [
base["id"] for base in get_api_bases(self.api)["bases"]
]
@requires_dependencies(["pyairtable"])
def fetch_table_ids(self):
from pyairtable.metadata import get_base_schema
bases = [
(base_id, self.api.base(base_id)) for base_id in self.base_ids_to_fetch_tables_from
]
metadata_for_each_base = [
(base_id, get_base_schema(base)["tables"]) for base_id, base in bases
]
baseid_tableid_viewid_tuples = [
(base_id, table["id"], None)
for base_id, base_metadata in metadata_for_each_base
for table in base_metadata
]
return baseid_tableid_viewid_tuples
def get_ingest_docs(self):
"""Fetches documents in an Airtable org."""
# When no list of paths provided, the connector ingests everything.
if not self.config.list_of_paths:
self.use_all_bases()
baseid_tableid_viewid_tuples = self.fetch_table_ids()
# When there is a list of paths, the connector checks the validity
# of the paths, and fetches table_ids to be ingested, based on the paths.
else:
self.paths = self.config.list_of_paths.split()
self.paths = [path.strip("/") for path in self.paths]
[check_path_validity(path) for path in self.paths]
self.base_ids_to_fetch_tables_from = []
baseid_tableid_viewid_tuples = []
for path in self.paths:
components = path.split("/")
if len(components) == 1: # only a base_id is provided
self.base_ids_to_fetch_tables_from.append(components[0])
elif len(components) == 2: # a base_id and a table_id are provided
baseid_tableid_viewid_tuples.append((components[0], components[1], None))
elif len(components) == 3: # a base_id, table_id, and a view_id are provided
baseid_tableid_viewid_tuples.append(
(components[0], components[1], components[2]),
)
baseid_tableid_viewid_tuples += self.fetch_table_ids()
return [
AirtableIngestDoc(
self.standard_config,
self.config,
AirtableFileMeta(base_id, table_id, view_id),
)
for base_id, table_id, view_id in baseid_tableid_viewid_tuples
]

View File

@ -1,3 +1,4 @@
from .airtable import airtable
from .azure import azure
from .biomed import biomed
from .box import box
@ -21,6 +22,7 @@ from .slack import slack
from .wikipedia import wikipedia
__all__ = [
"airtable",
"azure",
"biomed",
"box",

View File

@ -0,0 +1,43 @@
import hashlib
import logging
from typing import Optional
from unstructured.ingest.interfaces import ProcessorConfigs, StandardConnectorConfig
from unstructured.ingest.logger import ingest_log_streaming_init, logger
from unstructured.ingest.processor import process_documents
from unstructured.ingest.runner.utils import update_download_dir_hash
def airtable(
verbose: bool,
connector_config: StandardConnectorConfig,
processor_config: ProcessorConfigs,
personal_access_token: str,
list_of_paths: Optional[str],
**kwargs,
):
ingest_log_streaming_init(logging.DEBUG if verbose else logging.INFO)
hashed_dir_name = hashlib.sha256(
personal_access_token.encode("utf-8"),
)
connector_config.download_dir = update_download_dir_hash(
connector_config=connector_config,
hashed_dir_name=hashed_dir_name,
logger=logger,
)
from unstructured.ingest.connector.airtable import (
AirtableConnector,
SimpleAirtableConfig,
)
doc_connector = AirtableConnector( # type: ignore
standard_config=connector_config,
config=SimpleAirtableConfig(
personal_access_token=personal_access_token,
list_of_paths=list_of_paths,
),
)
process_documents(doc_connector=doc_connector, processor_config=processor_config)