feat: add salesforce connector (#1168)

This commit is contained in:
David Potter 2023-09-02 08:50:31 -07:00 committed by GitHub
parent 1a0b737e9c
commit b710bafa89
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
29 changed files with 1191 additions and 6 deletions

View File

@ -282,6 +282,9 @@ jobs:
MS_TENANT_ID: ${{ secrets.MS_TENANT_ID }}
MS_USER_EMAIL: ${{ secrets.MS_USER_EMAIL }}
MS_USER_PNAME: ${{ secrets.MS_USER_PNAME }}
SALESFORCE_USERNAME: ${{secrets.SALESFORCE_USERNAME}}
SALESFORCE_CONSUMER_KEY: ${{secrets.SALESFORCE_CONSUMER_KEY}}
SALESFORCE_PRIVATE_KEY: ${{secrets.SALESFORCE_PRIVATE_KEY}}
SHAREPOINT_CLIENT_ID: ${{secrets.SHAREPOINT_CLIENT_ID}}
SHAREPOINT_CRED: ${{secrets.SHAREPOINT_CRED}}
SHAREPOINT_SITE: ${{secrets.SHAREPOINT_SITE}}
@ -313,6 +316,7 @@ jobs:
make install-ingest-gitlab
make install-ingest-onedrive
make install-ingest-outlook
make install-ingest-salesforce
make install-ingest-slack
make install-ingest-wikipedia
make install-ingest-notion

View File

@ -73,6 +73,9 @@ jobs:
MS_TENANT_ID: ${{ secrets.MS_TENANT_ID }}
MS_USER_EMAIL: ${{ secrets.MS_USER_EMAIL }}
MS_USER_PNAME: ${{ secrets.MS_USER_PNAME }}
SALESFORCE_USERNAME: ${{secrets.SALESFORCE_USERNAME}}
SALESFORCE_CONSUMER_KEY: ${{secrets.SALESFORCE_CONSUMER_KEY}}
SALESFORCE_PRIVATE_KEY: ${{secrets.SALESFORCE_PRIVATE_KEY}}
SHAREPOINT_CLIENT_ID: ${{secrets.SHAREPOINT_CLIENT_ID}}
SHAREPOINT_CRED: ${{secrets.SHAREPOINT_CRED}}
SHAREPOINT_SITE: ${{secrets.SHAREPOINT_SITE}}
@ -104,6 +107,7 @@ jobs:
make install-ingest-gitlab
make install-ingest-onedrive
make install-ingest-outlook
make install-ingest-salesforce
make install-ingest-slack
make install-ingest-wikipedia
make install-ingest-notion

View File

@ -1,3 +1,13 @@
## 0.10.12-dev3
### Enhancements
### Features
* Add Salesforce Connector to be able to pull Account, Case, Campaign, EmailMessage, Lead
### Fixes
## 0.10.12-dev2
### Enhancements

View File

@ -188,6 +188,10 @@ install-ingest-local:
install-ingest-notion:
python3 -m pip install -r requirements/ingest-notion.txt
.PHONY: install-ingest-salesforce
install-ingest-salesforce:
python3 -m pip install -r requirements/ingest-salesforce.txt
.PHONY: install-unstructured-inference
install-unstructured-inference:
python3 -m pip install -r requirements/local-inference.txt

View File

@ -27,6 +27,7 @@ in our community `Slack. <https://join.slack.com/t/unstructuredw-kbe4326/shared_
upstream_connectors/outlook
upstream_connectors/reddit
upstream_connectors/s3
upstream_connectors/salesforce
upstream_connectors/sharepoint
upstream_connectors/slack
upstream_connectors/wikipedia

View File

@ -0,0 +1,129 @@
Salesforce
==========
Connect Salesforce to your preprocessing pipeline, and batch process Salesforce data using ``unstructured-ingest`` to store structured outputs locally on your filesystem.
First you'll need to install the Salesforce dependencies as shown here.
.. code:: shell
pip install "unstructured[salesforce]"
Run Locally
-----------
.. tabs::
.. tab:: Shell
.. code:: shell
unstructured-ingest \
salesforce \
--salesforce-username "$SALESFORCE_USERNAME" \
--salesforce-consumer-key "$SALESFORCE_CONSUMER_KEY" \
--salesforce-private-key-path "$SALESFORCE_PRIVATE_KEY_PATH" \
--salesforce-categories "EmailMessage,Account,Lead,Case,Campaign" \
--structured-output-dir salesforce-output \
--num-processes 2 \
--recursive \
--verbose
.. tab:: Python
.. code:: python
import subprocess
command = [
"unstructured-ingest",
"salesforce",
"--salesforce-username" "$SALESFORCE_USERNAME"
"--salesforce-consumer-key" "$SALESFORCE_CONSUMER_KEY"
"--salesforce-private-key-path" "$SALESFORCE_PRIVATE_KEY_PATH"
"--salesforce-categories" "EmailMessage,Account,Lead,Case,Campaign"
"--structured-output-dir" "salesforce-output"
"--box_app_config", "$BOX_APP_CONFIG_PATH"
"--remote-url", "box://utic-test-ingest-fixtures"
"--structured-output-dir", "box-output"
"--num-processes", "2"
"--recursive",
"--verbose",
]
# Run the command
process = subprocess.Popen(command, stdout=subprocess.PIPE)
output, error = process.communicate()
# Print output
if process.returncode == 0:
print('Command executed successfully. Output:')
print(output.decode())
else:
print('Command failed. Error:')
print(error.decode())
Run via the API
---------------
You can also use upstream connectors with the ``unstructured`` API. For this you'll need to use the ``--partition-by-api`` flag and pass in your API key with ``--api-key``.
.. tabs::
.. tab:: Shell
.. code:: shell
unstructured-ingest \
salesforce \
--salesforce-username "$SALESFORCE_USERNAME" \
--salesforce-consumer-key "$SALESFORCE_CONSUMER_KEY" \
--salesforce-private-key-path "$SALESFORCE_PRIVATE_KEY_PATH" \
--salesforce-categories "EmailMessage,Account,Lead,Case,Campaign" \
--structured-output-dir salesforce-output \
--num-processes 2 \
--recursive \
--verbose
--partition-by-api \
--api-key "<UNSTRUCTURED-API-KEY>"
.. tab:: Python
.. code:: python
import subprocess
command = [
"unstructured-ingest",
"salesforce",
"--salesforce-username" "$SALESFORCE_USERNAME"
"--salesforce-consumer-key" "$SALESFORCE_CONSUMER_KEY"
"--salesforce-private-key-path" "$SALESFORCE_PRIVATE_KEY_PATH"
"--salesforce-categories" "EmailMessage,Account,Lead,Case,Campaign"
"--structured-output-dir" "salesforce-output"
"--box_app_config", "$BOX_APP_CONFIG_PATH"
"--remote-url", "box://utic-test-ingest-fixtures"
"--structured-output-dir", "box-output"
"--num-processes", "2"
"--recursive",
"--verbose",
"--partition-by-api",
"--api-key", "<UNSTRUCTURED-API-KEY>",
]
# Run the command
process = subprocess.Popen(command, stdout=subprocess.PIPE)
output, error = process.communicate()
# Print output
if process.returncode == 0:
print('Command executed successfully. Output:')
print(output.decode())
else:
print('Command failed. Error:')
print(error.decode())
Additionaly, you will need to pass the ``--partition-endpoint`` if you're running the API locally. You can find more information about the ``unstructured`` API `here <https://github.com/Unstructured-IO/unstructured-api>`_.
For a full list of the options the CLI accepts check ``unstructured-ingest salesforce --help``.
NOTE: Keep in mind that you will need to have all the appropriate extras and dependencies for the file types of the documents contained in your data storage platform if you're running this locally. You can find more information about this in the `installation guide <https://unstructured-io.github.io/unstructured/installing.html>`_.

View File

@ -12,7 +12,7 @@
# Maybe check 'Make api calls as the as-user header'
# REAUTHORIZE app after making any of the above changes
# box_app_config is the path to a json file, available in the App Settings section of your Box App
# box-app-config is the path to a json file, available in the App Settings section of your Box App
# More info to set up the app:
# https://developer.box.com/guides/authentication/jwt/jwt-setup/
# and set up the app config.json file here:
@ -24,7 +24,7 @@ cd "$SCRIPT_DIR"/../../.. || exit 1
PYTHONPATH=. ./unstructured/ingest/main.py \
box \
--box_app_config "$BOX_APP_CONFIG_PATH" \
--box-app-config "$BOX_APP_CONFIG_PATH" \
--remote-url box://utic-test-ingest-fixtures \
--structured-output-dir box-output \
--num-processes 2 \

View File

@ -0,0 +1,29 @@
#!/usr/bin/env bash
# Processes multiple files in a nested folder structure from Salesforce
# through Unstructured's library in 2 processes.
# Available categories are: Account, Case, Campaign, EmailMessage, Lead
# Structured outputs are stored in salesforce-output/
# Using JWT authorization
# https://developer.salesforce.com/docs/atlas.en-us.sfdx_dev.meta/sfdx_dev/sfdx_dev_auth_key_and_cert.htm
# https://developer.salesforce.com/docs/atlas.en-us.sfdx_dev.meta/sfdx_dev/sfdx_dev_auth_connected_app.htm
# private-key-path is the path to the key file
SCRIPT_DIR=$( cd -- "$( dirname -- "${BASH_SOURCE[0]}" )" &> /dev/null && pwd )
cd "$SCRIPT_DIR"/../../.. || exit 1
PYTHONPATH=. ./unstructured/ingest/main.py \
salesforce \
--username "$SALESFORCE_USERNAME" \
--consumer-key "$SALESFORCE_CONSUMER_KEY" \
--private-key-path "$SALESFORCE_PRIVATE_KEY_PATH" \
--categories "EmailMessage,Account,Lead,Case,Campaign" \
--structured-output-dir salesforce-output \
--preserve-downloads \
--reprocess \
--verbose

View File

@ -0,0 +1,3 @@
-c constraints.in
-c base.txt
simple-salesforce

View File

@ -0,0 +1,63 @@
#
# This file is autogenerated by pip-compile with Python 3.8
# by the following command:
#
# pip-compile requirements/ingest-salesforce.in
#
attrs==23.1.0
# via zeep
certifi==2023.7.22
# via
# -c requirements/base.txt
# -c requirements/constraints.in
# requests
cffi==1.15.1
# via cryptography
charset-normalizer==3.2.0
# via
# -c requirements/base.txt
# requests
cryptography==41.0.3
# via simple-salesforce
idna==3.4
# via
# -c requirements/base.txt
# requests
isodate==0.6.1
# via zeep
lxml==4.9.3
# via
# -c requirements/base.txt
# zeep
platformdirs==3.10.0
# via zeep
pycparser==2.21
# via cffi
pyjwt==2.8.0
# via simple-salesforce
pytz==2023.3
# via zeep
requests==2.31.0
# via
# -c requirements/base.txt
# requests-file
# requests-toolbelt
# simple-salesforce
# zeep
requests-file==1.5.1
# via zeep
requests-toolbelt==1.0.0
# via zeep
simple-salesforce==1.12.4
# via -r requirements/ingest-salesforce.in
six==1.16.0
# via
# isodate
# requests-file
urllib3==1.26.16
# via
# -c requirements/base.txt
# -c requirements/constraints.in
# requests
zeep==4.2.1
# via simple-salesforce

View File

@ -148,6 +148,7 @@ setup(
"airtable": load_requirements("requirements/ingest-airtable.in"),
"sharepoint": load_requirements("requirements/ingest-sharepoint.in"),
"delta-table": load_requirements("requirements/ingest-delta-table.in"),
"salesforce": load_requirements("requirements/ingest-salesforce.in"),
# Legacy extra requirements
"huggingface": load_requirements("requirements/huggingface.in"),
"local-inference": all_doc_reqs,

View File

@ -0,0 +1,101 @@
[
{
"type": "UncategorizedText",
"element_id": "d6a8689a12ad0cd0314b04e1c2cee3c9",
"metadata": {
"data_source": {},
"filetype": "text/plain"
},
"text": "Id: 701Hu000001eX9EIAU"
},
{
"type": "Title",
"element_id": "69d9d94f0bc4b8d425fa99dce2b78311",
"metadata": {
"data_source": {},
"filetype": "text/plain"
},
"text": "Name: GC Product Webinar - Jan 7, 2002"
},
{
"type": "Title",
"element_id": "f80d0033c7e5a8d6ae66778815e33f35",
"metadata": {
"data_source": {},
"filetype": "text/plain"
},
"text": "Type: Webinar"
},
{
"type": "Title",
"element_id": "ad1b8a8ebbde05c57a773f60045de6f6",
"metadata": {
"data_source": {},
"filetype": "text/plain"
},
"text": "Status: Completed"
},
{
"type": "UncategorizedText",
"element_id": "08336889c7ebb4ba297a396eb072d83c",
"metadata": {
"data_source": {},
"filetype": "text/plain"
},
"text": "StartDate: 2023-01-29"
},
{
"type": "UncategorizedText",
"element_id": "887de29c98087cc9d07b242432cff930",
"metadata": {
"data_source": {},
"filetype": "text/plain"
},
"text": "EndDate: 2023-01-29"
},
{
"type": "Title",
"element_id": "ffe62693f34276315c62d28b06005bcf",
"metadata": {
"data_source": {},
"filetype": "text/plain"
},
"text": "BudgetedCost: 10000.0"
},
{
"type": "Title",
"element_id": "5739187b01834fedcc2362b5d3841d07",
"metadata": {
"data_source": {},
"filetype": "text/plain"
},
"text": "ActualCost: 11400.0"
},
{
"type": "Title",
"element_id": "8750ec1f6f59b282d104b919c7ffab0f",
"metadata": {
"data_source": {},
"filetype": "text/plain"
},
"text": "Description: None"
},
{
"type": "Title",
"element_id": "54e16b100bf2118d4c7c18c3f93e2223",
"metadata": {
"data_source": {},
"filetype": "text/plain"
},
"text": "NumberOfLeads: 0"
},
{
"type": "Title",
"element_id": "db1ed63f2ed83e51f75619047c417e49",
"metadata": {
"data_source": {},
"filetype": "text/plain"
},
"text": "NumberOfConvertedLeads: 0"
}
]

View File

@ -0,0 +1,101 @@
[
{
"type": "UncategorizedText",
"element_id": "582957afcab0b1f0df8e414c601b679a",
"metadata": {
"data_source": {},
"filetype": "text/plain"
},
"text": "Id: 701Hu000001eX9FIAU"
},
{
"type": "Title",
"element_id": "b1097935922e3ea926a35ace5fe68a61",
"metadata": {
"data_source": {},
"filetype": "text/plain"
},
"text": "Name: User Conference - Jun 17-19, 2002"
},
{
"type": "Title",
"element_id": "0c29c88f4b31c6f5caf8b36885c4c1c6",
"metadata": {
"data_source": {},
"filetype": "text/plain"
},
"text": "Type: Conference"
},
{
"type": "Title",
"element_id": "b537e10ee8e78ec7a18792eaa76ce0e4",
"metadata": {
"data_source": {},
"filetype": "text/plain"
},
"text": "Status: Planned"
},
{
"type": "UncategorizedText",
"element_id": "6cbe5bf59dabd290307e79547b2a86f2",
"metadata": {
"data_source": {},
"filetype": "text/plain"
},
"text": "StartDate: 2023-07-09"
},
{
"type": "UncategorizedText",
"element_id": "6cb3c2919b2c47be4b187d17c316a539",
"metadata": {
"data_source": {},
"filetype": "text/plain"
},
"text": "EndDate: 2023-07-11"
},
{
"type": "Title",
"element_id": "3062c8ae2aae8afe4e38d8fa2a6ea248",
"metadata": {
"data_source": {},
"filetype": "text/plain"
},
"text": "BudgetedCost: 100000.0"
},
{
"type": "Title",
"element_id": "754630afa9bba639e59a8a80785f2766",
"metadata": {
"data_source": {},
"filetype": "text/plain"
},
"text": "ActualCost: None"
},
{
"type": "Title",
"element_id": "8750ec1f6f59b282d104b919c7ffab0f",
"metadata": {
"data_source": {},
"filetype": "text/plain"
},
"text": "Description: None"
},
{
"type": "Title",
"element_id": "54e16b100bf2118d4c7c18c3f93e2223",
"metadata": {
"data_source": {},
"filetype": "text/plain"
},
"text": "NumberOfLeads: 0"
},
{
"type": "Title",
"element_id": "db1ed63f2ed83e51f75619047c417e49",
"metadata": {
"data_source": {},
"filetype": "text/plain"
},
"text": "NumberOfConvertedLeads: 0"
}
]

View File

@ -0,0 +1,101 @@
[
{
"type": "UncategorizedText",
"element_id": "153e1fa63953e7e19f9004e0253eab68",
"metadata": {
"data_source": {},
"filetype": "text/plain"
},
"text": "Id: 701Hu000001eX9GIAU"
},
{
"type": "Title",
"element_id": "9b9c5e71eff6a483e85da52d1a1f1005",
"metadata": {
"data_source": {},
"filetype": "text/plain"
},
"text": "Name: DM Campaign to Top Customers - Nov 12-23, 2001"
},
{
"type": "Title",
"element_id": "28fa7658294d152358d23d8bde3c9e56",
"metadata": {
"data_source": {},
"filetype": "text/plain"
},
"text": "Type: Direct Mail"
},
{
"type": "Title",
"element_id": "ad1b8a8ebbde05c57a773f60045de6f6",
"metadata": {
"data_source": {},
"filetype": "text/plain"
},
"text": "Status: Completed"
},
{
"type": "UncategorizedText",
"element_id": "59b06bd535fc60e446ce4f6db6392a8d",
"metadata": {
"data_source": {},
"filetype": "text/plain"
},
"text": "StartDate: 2022-12-04"
},
{
"type": "UncategorizedText",
"element_id": "a7caa29a82d158a422b901babed10321",
"metadata": {
"data_source": {},
"filetype": "text/plain"
},
"text": "EndDate: 2022-12-15"
},
{
"type": "Title",
"element_id": "a830028696ccdc6c73f26e2f5f0b3e0d",
"metadata": {
"data_source": {},
"filetype": "text/plain"
},
"text": "BudgetedCost: 25000.0"
},
{
"type": "Title",
"element_id": "f393ef1a67e6b638d6825e00ffa85b5e",
"metadata": {
"data_source": {},
"filetype": "text/plain"
},
"text": "ActualCost: 23500.0"
},
{
"type": "Title",
"element_id": "8750ec1f6f59b282d104b919c7ffab0f",
"metadata": {
"data_source": {},
"filetype": "text/plain"
},
"text": "Description: None"
},
{
"type": "Title",
"element_id": "54e16b100bf2118d4c7c18c3f93e2223",
"metadata": {
"data_source": {},
"filetype": "text/plain"
},
"text": "NumberOfLeads: 0"
},
{
"type": "Title",
"element_id": "db1ed63f2ed83e51f75619047c417e49",
"metadata": {
"data_source": {},
"filetype": "text/plain"
},
"text": "NumberOfConvertedLeads: 0"
}
]

View File

@ -0,0 +1,101 @@
[
{
"type": "UncategorizedText",
"element_id": "697cb5681a4f17c6cb712dfce64ae2d1",
"metadata": {
"data_source": {},
"filetype": "text/plain"
},
"text": "Id: 701Hu000001eX9HIAU"
},
{
"type": "Title",
"element_id": "37c6c6fcf92fd42c703ad967a4691a32",
"metadata": {
"data_source": {},
"filetype": "text/plain"
},
"text": "Name: International Electrical Engineers Association Trade Show - Mar 4-5, 2002"
},
{
"type": "Title",
"element_id": "a4b5a79024228eb84bcefe4bfe8bce47",
"metadata": {
"data_source": {},
"filetype": "text/plain"
},
"text": "Type: Trade Show"
},
{
"type": "Title",
"element_id": "b537e10ee8e78ec7a18792eaa76ce0e4",
"metadata": {
"data_source": {},
"filetype": "text/plain"
},
"text": "Status: Planned"
},
{
"type": "UncategorizedText",
"element_id": "5d68899808565a0eb340e7ce9a42c981",
"metadata": {
"data_source": {},
"filetype": "text/plain"
},
"text": "StartDate: 2023-03-26"
},
{
"type": "UncategorizedText",
"element_id": "eb23b79d3e286bef615fa4bf7bbe6c6d",
"metadata": {
"data_source": {},
"filetype": "text/plain"
},
"text": "EndDate: 2023-03-27"
},
{
"type": "Title",
"element_id": "2bb64865cdfea5db0f000dde162fc372",
"metadata": {
"data_source": {},
"filetype": "text/plain"
},
"text": "BudgetedCost: 50000.0"
},
{
"type": "Title",
"element_id": "754630afa9bba639e59a8a80785f2766",
"metadata": {
"data_source": {},
"filetype": "text/plain"
},
"text": "ActualCost: None"
},
{
"type": "Title",
"element_id": "8750ec1f6f59b282d104b919c7ffab0f",
"metadata": {
"data_source": {},
"filetype": "text/plain"
},
"text": "Description: None"
},
{
"type": "Title",
"element_id": "54e16b100bf2118d4c7c18c3f93e2223",
"metadata": {
"data_source": {},
"filetype": "text/plain"
},
"text": "NumberOfLeads: 0"
},
{
"type": "Title",
"element_id": "db1ed63f2ed83e51f75619047c417e49",
"metadata": {
"data_source": {},
"filetype": "text/plain"
},
"text": "NumberOfConvertedLeads: 0"
}
]

View File

@ -0,0 +1,18 @@
[
{
"type": "NarrativeText",
"element_id": "d954fa8e82ded23ebde30b2d53d5f81d",
"metadata": {
"data_source": {},
"filetype": "message/rfc822",
"sent_from": [
"devops+salesforce-connector@unstructured.io"
],
"sent_to": [
"jane_gray@uoa.edu"
],
"subject": "Test of email 1"
},
"text": "Jane. This is a test of sending you an email from Salesforce!\n\n_____________________________________________________________________\nPowered by Salesforce\nhttp://www.salesforce.com/"
}
]

View File

@ -0,0 +1,18 @@
[
{
"type": "NarrativeText",
"element_id": "f5ac98aa9002453f536877714c5eb88d",
"metadata": {
"data_source": {},
"filetype": "message/rfc822",
"sent_from": [
"devops+salesforce-connector@unstructured.io"
],
"sent_to": [
"sean@edge.com"
],
"subject": "Test of Salesforce 2"
},
"text": "Hey Sean.\n\nTesting email parsing here.\nType: email\n\nJust testing the email system\n\n_____________________________________________________________________\nPowered by Salesforce\nhttp://www.salesforce.com/"
}
]

View File

@ -27,13 +27,12 @@ PYTHONPATH=. ./unstructured/ingest/main.py \
--download-dir "$DOWNLOAD_DIR" \
--box-app-config "$BOX_APP_CONFIG_PATH" \
--remote-url box://utic-test-ingest-fixtures \
--structured-output-dir box-output \
--structured-output-dir "$OUTPUT_DIR" \
--metadata-exclude coordinates,filename,file_directory,metadata.data_source.date_processed,metadata.last_modified \
--num-processes 2 \
--preserve-downloads \
--recursive \
--reprocess \
--structured-output-dir "$OUTPUT_DIR" \
--verbose
sh "$SCRIPT_DIR"/check-diff-expected-output.sh $OUTPUT_FOLDER_NAME

View File

@ -0,0 +1,40 @@
#!/usr/bin/env bash
# Set either SALESFORCE_PRIVATE_KEY (app config json content as string) or
# SALESFORCE_PRIVATE_KEY_PATH (path to app config json file) env var
set -e
SCRIPT_DIR=$(dirname "$(realpath "$0")")
cd "$SCRIPT_DIR"/.. || exit 1
OUTPUT_FOLDER_NAME=salesforce
OUTPUT_DIR=$SCRIPT_DIR/structured-output/$OUTPUT_FOLDER_NAME
DOWNLOAD_DIR=$SCRIPT_DIR/download/$OUTPUT_FOLDER_NAME
if [ -z "$SALESFORCE_PRIVATE_KEY" ] && [ -z "$SALESFORCE_PRIVATE_KEY_PATH" ]; then
echo "Skipping Salesforce ingest test because neither SALESFORCE_PRIVATE_KEY nor SALESFORCE_PRIVATE_KEY_PATH env vars are set."
exit 0
fi
if [ -z "$SALESFORCE_PRIVATE_KEY_PATH" ]; then
# Create temporary service key file
SALESFORCE_PRIVATE_KEY_PATH=$(mktemp)
echo "$SALESFORCE_PRIVATE_KEY" >"$SALESFORCE_PRIVATE_KEY_PATH"
fi
PYTHONPATH=. ./unstructured/ingest/main.py \
salesforce \
--categories "EmailMessage,Campaign" \
--download-dir "$DOWNLOAD_DIR" \
--username "$SALESFORCE_USERNAME" \
--consumer-key "$SALESFORCE_CONSUMER_KEY" \
--private-key-path "$SALESFORCE_PRIVATE_KEY_PATH" \
--metadata-exclude coordinates,filename,file_directory,metadata.data_source.date_processed,metadata.last_modified \
--num-processes 2 \
--preserve-downloads \
--recursive \
--reprocess \
--structured-output-dir "$OUTPUT_DIR" \
--verbose
sh "$SCRIPT_DIR"/check-diff-expected-output.sh $OUTPUT_FOLDER_NAME

View File

@ -35,6 +35,7 @@ export OMP_THREAD_LIMIT=1
./test_unstructured_ingest/test-ingest-local-single-file-with-pdf-infer-table-structure.sh
./test_unstructured_ingest/test-ingest-notion.sh
./test_unstructured_ingest/test-ingest-delta-table.sh
./test_unstructured_ingest/test-ingest-salesforce.sh
# NOTE(yuming): The following test should be put after any tests with --preserve-downloads option
./test_unstructured_ingest/test-ingest-pdf-fast-reprocess.sh
./test_unstructured_ingest/test-ingest-sharepoint.sh

View File

@ -1 +1 @@
__version__ = "0.10.12-dev2" # pragma: no cover
__version__ = "0.10.12-dev3" # pragma: no cover

View File

@ -32,6 +32,7 @@ subcommands = [
cli_cmds.confluence,
cli_cmds.sharepoint,
cli_cmds.airtable,
cli_cmds.salesforce,
]
for subcommand in subcommands:

View File

@ -23,6 +23,7 @@ from .outlook import get_cmd as outlook
from .reddit import get_cmd as reddit
from .s3_2 import get_dest_cmd as s3_dest
from .s3_2 import get_source_cmd as s3
from .salesforce import get_cmd as salesforce
from .sharepoint import get_cmd as sharepoint
from .slack import get_cmd as slack
from .wikipedia import get_cmd as wikipedia
@ -52,6 +53,7 @@ __all__ = [
"outlook",
"reddit",
"s3",
"salesforce",
"sharepoint",
"slack",
"wikipedia",

View File

@ -0,0 +1,62 @@
import logging
import click
from unstructured.ingest.cli.common import (
add_recursive_option,
add_shared_options,
log_options,
map_to_processor_config,
map_to_standard_config,
run_init_checks,
)
from unstructured.ingest.logger import ingest_log_streaming_init, logger
from unstructured.ingest.runner import salesforce as salesforce_fn
@click.command()
@click.option(
"--categories",
default=None,
required=True,
help="Comma separated list of Salesforce categories to download. "
"Currently only Account, Case, Campaign, EmailMessage, Lead.",
)
@click.option(
"--username",
required=True,
help="Salesforce username usually looks like an email.",
)
@click.option(
"--consumer-key",
required=True,
help="For the Salesforce JWT auth. Found in Consumer Details.",
)
@click.option(
"--private-key-path",
required=True,
help="Path to the private key for the Salesforce JWT auth. Usually named server.key.",
)
def salesforce(**options):
verbose = options.get("verbose", False)
ingest_log_streaming_init(logging.DEBUG if verbose else logging.INFO)
log_options(options)
try:
run_init_checks(**options)
connector_config = map_to_standard_config(options)
processor_config = map_to_processor_config(options)
salesforce_fn(
connector_config=connector_config,
processor_config=processor_config,
**options,
)
except Exception as e:
logger.error(e, exc_info=True)
raise click.ClickException(str(e)) from e
def get_cmd() -> click.Command:
cmd = salesforce
add_recursive_option(cmd)
add_shared_options(cmd)
return cmd

View File

@ -25,6 +25,7 @@ from unstructured.ingest.connector.onedrive import OneDriveIngestDoc
from unstructured.ingest.connector.outlook import OutlookIngestDoc
from unstructured.ingest.connector.reddit import RedditIngestDoc
from unstructured.ingest.connector.s3_2 import S3IngestDoc
from unstructured.ingest.connector.salesforce import SalesforceIngestDoc
from unstructured.ingest.connector.sharepoint import SharepointIngestDoc
from unstructured.ingest.connector.slack import SlackIngestDoc
from unstructured.ingest.connector.wikipedia import (
@ -55,6 +56,7 @@ INGEST_DOC_NAME_TO_CLASS: Dict[str, Type[DataClassJsonMixin]] = {
"outlook": OutlookIngestDoc,
"reddit": RedditIngestDoc,
"s3": S3IngestDoc,
"salesforce": SalesforceIngestDoc,
"sharepoint": SharepointIngestDoc,
"slack": SlackIngestDoc,
"wikipedia_html": WikipediaIngestHTMLDoc,

View File

@ -0,0 +1,340 @@
"""
Salesforce Connector
Able to download Account, Case, Campaign, EmailMessage, Lead
Salesforce returns everything as a list of json.
This saves each entry as a separate file to be partitioned.
Using JWT authorization
https://developer.salesforce.com/docs/atlas.en-us.sfdx_dev.meta/sfdx_dev/sfdx_dev_auth_key_and_cert.htm
https://developer.salesforce.com/docs/atlas.en-us.sfdx_dev.meta/sfdx_dev/sfdx_dev_auth_connected_app.htm
"""
import os
from dataclasses import dataclass
from email.utils import formatdate
from pathlib import Path
from string import Template
from textwrap import dedent
from typing import Any, Dict, List, Type
from dateutil import parser # type: ignore
from unstructured.ingest.interfaces import (
BaseConnector,
BaseConnectorConfig,
BaseIngestDoc,
ConnectorCleanupMixin,
IngestDocCleanupMixin,
StandardConnectorConfig,
)
from unstructured.ingest.logger import logger
from unstructured.utils import requires_dependencies
class MissingCategoryError(Exception):
"""There are no categories with that name."""
ACCEPTED_CATEGORIES = ["Account", "Case", "Campaign", "EmailMessage", "Lead"]
EMAIL_TEMPLATE = Template(
"""MIME-Version: 1.0
Date: $date
Message-ID: $message_identifier
Subject: $subject
From: $from_email
To: $to_email
Content-Type: multipart/alternative; boundary="00000000000095c9b205eff92630"
--00000000000095c9b205eff92630
Content-Type: text/plain; charset="UTF-8"
$textbody
--00000000000095c9b205eff92630
Content-Type: text/html; charset="UTF-8"
$textbody
--00000000000095c9b205eff92630--
""",
)
ACCOUNT_TEMPLATE = Template(
"""Id: $id
Name: $name
Type: $account_type
Phone: $phone
AccountNumber: $account_number
Website: $website
Industry: $industry
AnnualRevenue: $annual_revenue
NumberOfEmployees: $number_employees
Ownership: $ownership
TickerSymbol: $ticker_symbol
Description: $description
Rating: $rating
DandbCompanyId: $dnb_id
""",
)
LEAD_TEMPLATE = Template(
"""Id: $id
Name: $name
Title: $title
Company: $company
Phone: $phone
Email: $email
Website: $website
Description: $description
LeadSource: $lead_source
Rating: $rating
Status: $status
Industry: $industry
""",
)
CASE_TEMPLATE = Template(
"""Id: $id
Type: $type
Status: $status
Reason: $reason
Origin: $origin
Subject: $subject
Priority: $priority
Description: $description
Comments: $comments
""",
)
CAMPAIGN_TEMPLATE = Template(
"""Id: $id
Name: $name
Type: $type
Status: $status
StartDate: $start_date
EndDate: $end_date
BudgetedCost: $budgeted_cost
ActualCost: $actual_cost
Description: $description
NumberOfLeads: $number_of_leads
NumberOfConvertedLeads: $number_of_converted_leads
""",
)
@dataclass
class SimpleSalesforceConfig(BaseConnectorConfig):
"""Connector specific attributes"""
categories: List[str]
username: str
consumer_key: str
private_key_path: str
recursive: bool = False
@staticmethod
def parse_folders(folder_str: str) -> List[str]:
"""Parses a comma separated string of Outlook folders into a list."""
return [x.strip() for x in folder_str.split(",")]
@requires_dependencies(["simple_salesforce"], extras="salesforce")
def _get_client(self):
from simple_salesforce import Salesforce
return Salesforce(
username=self.username,
consumer_key=self.consumer_key,
privatekey_file=self.private_key_path,
)
@dataclass
class SalesforceIngestDoc(IngestDocCleanupMixin, BaseIngestDoc):
record_type: str
record_id: str
config: SimpleSalesforceConfig
registry_name: str = "salesforce"
def _tmp_download_file(self) -> Path:
if self.record_type == "EmailMessage":
record_file = self.record_id + ".eml"
elif self.record_type in ["Account", "Lead", "Case", "Campaign"]:
record_file = self.record_id + ".txt"
else:
raise MissingCategoryError(
f"There are no categories with the name: {self.record_type}",
)
return Path(self.standard_config.download_dir) / self.record_type / record_file
@property
def _output_filename(self) -> Path:
record_file = self.record_id + ".json"
return Path(self.standard_config.output_dir) / self.record_type / record_file
def _create_full_tmp_dir_path(self):
self._tmp_download_file().parent.mkdir(parents=True, exist_ok=True)
def create_account(self, account_json: Dict[str, Any]) -> str:
"""Creates partitionable account file"""
account = ACCOUNT_TEMPLATE.substitute(
id=account_json.get("Id"),
name=account_json.get("Name"),
account_type=account_json.get("Type"),
phone=account_json.get("Phone"),
account_number=account_json.get("AccountNumber"),
website=account_json.get("Website"),
industry=account_json.get("Industry"),
annual_revenue=account_json.get("AnnualRevenue"),
number_employees=account_json.get("NumberOfEmployees"),
ownership=account_json.get("Ownership"),
ticker_symbol=account_json.get("TickerSymbol"),
description=account_json.get("Description"),
rating=account_json.get("Rating"),
dnb_id=account_json.get("DandbCompanyId"),
)
return dedent(account)
def create_lead(self, lead_json: Dict[str, Any]) -> str:
"""Creates partitionable lead file"""
lead = LEAD_TEMPLATE.substitute(
id=lead_json.get("Id"),
name=lead_json.get("Name"),
title=lead_json.get("Title"),
company=lead_json.get("Company"),
phone=lead_json.get("Phone"),
email=lead_json.get("Email"),
website=lead_json.get("Website"),
description=lead_json.get("Description"),
lead_source=lead_json.get("LeadSource"),
rating=lead_json.get("Rating"),
status=lead_json.get("Status"),
industry=lead_json.get("Industry"),
)
return dedent(lead)
def create_case(self, case_json: Dict[str, Any]) -> str:
"""Creates partitionable case file"""
case = CASE_TEMPLATE.substitute(
id=case_json.get("Id"),
type=case_json.get("Type"),
status=case_json.get("Status"),
reason=case_json.get("Reason"),
origin=case_json.get("Origin"),
subject=case_json.get("Subject"),
priority=case_json.get("Priority"),
description=case_json.get("Description"),
comments=case_json.get("Comments"),
)
return dedent(case)
def create_campaign(self, campaign_json: Dict[str, Any]) -> str:
"""Creates partitionable campaign file"""
campaign = CAMPAIGN_TEMPLATE.substitute(
id=campaign_json.get("Id"),
name=campaign_json.get("Name"),
type=campaign_json.get("Type"),
status=campaign_json.get("Status"),
start_date=campaign_json.get("StartDate"),
end_date=campaign_json.get("EndDate"),
budgeted_cost=campaign_json.get("BudgetedCost"),
actual_cost=campaign_json.get("ActualCost"),
description=campaign_json.get("Description"),
number_of_leads=campaign_json.get("NumberOfLeads"),
number_of_converted_leads=campaign_json.get("NumberOfConvertedLeads"),
)
return dedent(campaign)
def create_eml(self, email_json: Dict[str, Any]) -> str:
"""Recreates standard expected .eml format using template."""
eml = EMAIL_TEMPLATE.substitute(
date=formatdate(parser.parse(email_json.get("MessageDate")).timestamp()),
message_identifier=email_json.get("MessageIdentifier"),
subject=email_json.get("Subject"),
from_email=email_json.get("FromAddress"),
to_email=email_json.get("ToAddress"),
textbody=email_json.get("TextBody"),
)
return dedent(eml)
@BaseIngestDoc.skip_if_file_exists
def get_file(self):
"""Saves individual json records locally."""
self._create_full_tmp_dir_path()
logger.debug(f"Writing file {self.record_id} - PID: {os.getpid()}")
client = self.config._get_client()
# Get record from Salesforce based on id
record = client.query_all(
f"select FIELDS(STANDARD) from {self.record_type} where Id='{self.record_id}'",
)["records"][0]
try:
if self.record_type == "EmailMessage":
formatted_record = self.create_eml(record)
elif self.record_type == "Account":
formatted_record = self.create_account(record)
elif self.record_type == "Lead":
formatted_record = self.create_lead(record)
elif self.record_type == "Case":
formatted_record = self.create_case(record)
elif self.record_type == "Campaign":
formatted_record = self.create_campaign(record)
with open(self._tmp_download_file(), "w") as page_file:
page_file.write(formatted_record)
except Exception as e:
logger.error(
f"Error while downloading and saving file: {self.record_id}.",
)
logger.error(e)
@property
def filename(self):
"""The filename of the file created from a Salesforce record"""
return self._tmp_download_file()
class SalesforceConnector(ConnectorCleanupMixin, BaseConnector):
ingest_doc_cls: Type[SalesforceIngestDoc] = SalesforceIngestDoc
config: SimpleSalesforceConfig
def __init__(
self,
config: SimpleSalesforceConfig,
standard_config: StandardConnectorConfig,
) -> None:
super().__init__(standard_config, config)
def initialize(self):
pass
@requires_dependencies(["simple_salesforce"], extras="salesforce")
def get_ingest_docs(self) -> List[SalesforceIngestDoc]:
"""Get Salesforce Ids for the records.
Send them to next phase where each doc gets downloaded into the
appropriate format for partitioning.
"""
from simple_salesforce.exceptions import SalesforceMalformedRequest
client = self.config._get_client()
ingest_docs = []
for record_type in self.config.categories:
if record_type not in ACCEPTED_CATEGORIES:
raise ValueError(f"{record_type} not currently an accepted Salesforce category")
try:
# Get ids from Salesforce
records = client.query_all(
f"select Id from {record_type}",
)
for record in records["records"]:
ingest_docs.append(
SalesforceIngestDoc(
self.standard_config,
self.config,
record_type,
record["Id"],
),
)
except SalesforceMalformedRequest as e:
raise SalesforceMalformedRequest(f"Problem with Salesforce query: {e}")
return ingest_docs

View File

@ -76,7 +76,8 @@ class Processor:
# Debugging tip: use the below line and comment out the mp.Pool loop
# block to remain in single process
# self.doc_processor_fn(docs[0])
# json_docs = [doc.to_json() for doc in docs]
# self.doc_processor_fn(json_docs[0])
logger.info(f"Processing {len(docs)} docs")
json_docs = [doc.to_json() for doc in docs]
try:

View File

@ -18,6 +18,7 @@ from .onedrive import onedrive
from .outlook import outlook
from .reddit import reddit
from .s3 import s3
from .salesforce import salesforce
from .sharepoint import sharepoint
from .slack import slack
from .wikipedia import wikipedia
@ -43,6 +44,7 @@ __all__ = [
"outlook",
"reddit",
"s3",
"salesforce",
"sharepoint",
"slack",
"wikipedia",

View File

@ -0,0 +1,47 @@
import hashlib
import logging
from unstructured.ingest.interfaces import ProcessorConfigs, StandardConnectorConfig
from unstructured.ingest.logger import ingest_log_streaming_init, logger
from unstructured.ingest.processor import process_documents
from unstructured.ingest.runner.utils import update_download_dir_hash
def salesforce(
verbose: bool,
connector_config: StandardConnectorConfig,
processor_config: ProcessorConfigs,
recursive: bool,
categories: str,
username: str,
consumer_key: str,
private_key_path: str,
**kwargs,
):
ingest_log_streaming_init(logging.DEBUG if verbose else logging.INFO)
hashed_dir_name = hashlib.sha256(username.encode("utf-8"))
connector_config.download_dir = update_download_dir_hash(
connector_name="salesforce",
connector_config=connector_config,
hashed_dir_name=hashed_dir_name,
logger=logger,
)
from unstructured.ingest.connector.salesforce import (
SalesforceConnector,
SimpleSalesforceConfig,
)
doc_connector = SalesforceConnector( # type: ignore
standard_config=connector_config,
config=SimpleSalesforceConfig(
categories=SimpleSalesforceConfig.parse_folders(categories),
username=username,
consumer_key=consumer_key,
private_key_path=private_key_path,
recursive=recursive,
),
)
process_documents(doc_connector=doc_connector, processor_config=processor_config)