Slack connector (#462)

This connector takes a slack channel id, token and other options to
pull conversation history for a channel and store it as a text file that
is then processed by unstructured into expected output.
This commit is contained in:
Trevor Bossert 2023-04-16 12:34:43 -07:00 committed by GitHub
parent a11563fe63
commit cff7f4fd5a
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
15 changed files with 609 additions and 27 deletions

View File

@ -103,6 +103,7 @@ jobs:
- name: Test
env:
GH_READ_ONLY_ACCESS_TOKEN: ${{ secrets.GH_READ_ONLY_ACCESS_TOKEN }}
SLACK_TOKEN: ${{ secrets.SLACK_TOKEN }}
run: |
source .venv/bin/activate
make install-detectron2
@ -117,6 +118,7 @@ jobs:
make install-ingest-azure
make install-ingest-github
make install-ingest-gitlab
make install-ingest-slack
make install-ingest-wikipedia
./test_unstructured_ingest/test-ingest.sh

View File

@ -1,4 +1,4 @@
## 0.5.13-dev5
## 0.5.13-dev6
### Enhancements
@ -22,8 +22,10 @@
* Add OS mimetypes DB to docker image, mainly for unstructured-api compat.
* Use the image registry as a cache when building Docker images.
* Adds the ability for `partition_text` to group together broken paragraphs.
* Added method to utils to allow date time format validation
### Features
* Add Slack connector to pull messages for a specific channel
* Add --partition-by-api parameter to unstructured-ingest
* Added `partition_rtf` for processing rich text files.

View File

@ -78,6 +78,7 @@ RUN python3.8 -m pip install pip==${PIP_VERSION} && \
pip install --no-cache -r requirements/ingest-google-drive.txt && \
pip install --no-cache -r requirements/ingest-reddit.txt && \
pip install --no-cache -r requirements/ingest-s3.txt && \
pip install --no-cache -r requirements/ingest-slack.txt && \
pip install --no-cache -r requirements/ingest-wikipedia.txt && \
pip install --no-cache -r requirements/local-inference.txt && \
pip install --no-cache "detectron2@git+https://github.com/facebookresearch/detectron2.git@e2ce8dc#egg=detectron2"

View File

@ -75,6 +75,10 @@ install-ingest-gitlab:
install-ingest-reddit:
python3 -m pip install -r requirements/ingest-reddit.txt
.PHONY: install-ingest-slack
install-ingest-slack:
pip install -r requirements/ingest-slack.txt
.PHONY: install-ingest-wikipedia
install-ingest-wikipedia:
python3 -m pip install -r requirements/ingest-wikipedia.txt
@ -112,6 +116,7 @@ pip-compile:
pip-compile --upgrade --extra=reddit --output-file=requirements/ingest-reddit.txt requirements/base.txt setup.py
pip-compile --upgrade --extra=github --output-file=requirements/ingest-github.txt requirements/base.txt setup.py
pip-compile --upgrade --extra=gitlab --output-file=requirements/ingest-gitlab.txt requirements/base.txt setup.py
pip-compile --upgrade --extra=slack --output-file=requirements/ingest-slack.txt requirements/base.txt setup.py
pip-compile --upgrade --extra=wikipedia --output-file=requirements/ingest-wikipedia.txt requirements/base.txt setup.py
pip-compile --upgrade --extra=google-drive --output-file=requirements/ingest-google-drive.txt requirements/base.txt setup.py

View File

@ -0,0 +1,23 @@
#!/usr/bin/env bash
# Processes the Unstructured-IO/unstructured repository
# through Unstructured's library in 2 processes.
# Structured outputs are stored in slack-ingest-output/
# oldest, latest arguments are optional
# Ingests a slack text channel into a file.
# slack-channels is a comma sparated list of channel IDs.
# Bot user must be in the channels for them to be ingested.
SCRIPT_DIR=$( cd -- "$( dirname -- "${BASH_SOURCE[0]}" )" &> /dev/null && pwd )
cd "$SCRIPT_DIR"/../../.. || exit 1
PYTHONPATH=. ./unstructured/ingest/main.py \
--slack-channels 12345678 \
--slack-token 12345678 \
--download-dir slack-ingest-download \
--structured-output-dir slack-ingest-output \
--oldest 2023-04-01T01:00:00-08:00 \
--latest 2023-04-02

View File

@ -0,0 +1,208 @@
#
# This file is autogenerated by pip-compile with Python 3.8
# by the following command:
#
# pip-compile --extra=slack --output-file=requirements/ingest-slack.txt requirements/base.txt setup.py
#
anyio==3.6.2
# via
# -r requirements/base.txt
# httpcore
argilla==1.5.0
# via
# -r requirements/base.txt
# unstructured (setup.py)
backoff==2.2.1
# via
# -r requirements/base.txt
# argilla
certifi==2022.12.7
# via
# -r requirements/base.txt
# httpcore
# httpx
# requests
# unstructured (setup.py)
charset-normalizer==3.1.0
# via
# -r requirements/base.txt
# requests
click==8.1.3
# via
# -r requirements/base.txt
# nltk
commonmark==0.9.1
# via
# -r requirements/base.txt
# rich
deprecated==1.2.13
# via
# -r requirements/base.txt
# argilla
et-xmlfile==1.1.0
# via
# -r requirements/base.txt
# openpyxl
h11==0.14.0
# via
# -r requirements/base.txt
# httpcore
httpcore==0.16.3
# via
# -r requirements/base.txt
# httpx
httpx==0.23.3
# via
# -r requirements/base.txt
# argilla
idna==3.4
# via
# -r requirements/base.txt
# anyio
# requests
# rfc3986
importlib-metadata==6.1.0
# via
# -r requirements/base.txt
# markdown
joblib==1.2.0
# via
# -r requirements/base.txt
# nltk
lxml==4.9.2
# via
# -r requirements/base.txt
# python-docx
# python-pptx
# unstructured (setup.py)
markdown==3.4.3
# via
# -r requirements/base.txt
# unstructured (setup.py)
monotonic==1.6
# via
# -r requirements/base.txt
# argilla
msg-parser==1.2.0
# via
# -r requirements/base.txt
# unstructured (setup.py)
nltk==3.8.1
# via
# -r requirements/base.txt
# unstructured (setup.py)
numpy==1.23.5
# via
# -r requirements/base.txt
# argilla
# pandas
olefile==0.46
# via
# -r requirements/base.txt
# msg-parser
openpyxl==3.1.2
# via
# -r requirements/base.txt
# unstructured (setup.py)
packaging==23.0
# via
# -r requirements/base.txt
# argilla
pandas==1.5.3
# via
# -r requirements/base.txt
# argilla
# unstructured (setup.py)
pillow==9.4.0
# via
# -r requirements/base.txt
# python-pptx
# unstructured (setup.py)
pydantic==1.10.7
# via
# -r requirements/base.txt
# argilla
pygments==2.14.0
# via
# -r requirements/base.txt
# rich
pypandoc==1.11
# via
# -r requirements/base.txt
# unstructured (setup.py)
python-dateutil==2.8.2
# via
# -r requirements/base.txt
# pandas
python-docx==0.8.11
# via
# -r requirements/base.txt
# unstructured (setup.py)
python-magic==0.4.27
# via
# -r requirements/base.txt
# unstructured (setup.py)
python-pptx==0.6.21
# via
# -r requirements/base.txt
# unstructured (setup.py)
pytz==2023.3
# via
# -r requirements/base.txt
# pandas
regex==2023.3.23
# via
# -r requirements/base.txt
# nltk
requests==2.28.2
# via
# -r requirements/base.txt
# unstructured (setup.py)
rfc3986[idna2008]==1.5.0
# via
# -r requirements/base.txt
# httpx
rich==13.0.1
# via
# -r requirements/base.txt
# argilla
six==1.16.0
# via
# -r requirements/base.txt
# python-dateutil
slack-sdk==3.21.0
# via unstructured (setup.py)
sniffio==1.3.0
# via
# -r requirements/base.txt
# anyio
# httpcore
# httpx
tqdm==4.65.0
# via
# -r requirements/base.txt
# argilla
# nltk
typing-extensions==4.5.0
# via
# -r requirements/base.txt
# pydantic
# rich
urllib3==1.26.15
# via
# -r requirements/base.txt
# requests
wrapt==1.14.1
# via
# -r requirements/base.txt
# argilla
# deprecated
xlsxwriter==3.0.9
# via
# -r requirements/base.txt
# python-pptx
zipp==3.15.0
# via
# -r requirements/base.txt
# importlib-metadata

View File

@ -87,6 +87,7 @@ setup(
],
"gitlab": ["python-gitlab"],
"reddit": ["praw"],
"slack": ["slack_sdk"],
"wikipedia": ["wikipedia"],
"google-drive": [
"google-api-python-client",

View File

@ -0,0 +1,26 @@
[
{
"element_id": "cf80cd8aed482d5d1527d7dc72fceff8",
"text": "testing",
"type": "NarrativeText",
"metadata": {
"filename": "slack-ingest-download/C052BGT7718.txt"
}
},
{
"element_id": "55434f5d914503444c44a26487bf31bc",
"text": "<@U051UBRR946> has joined the channel",
"type": "NarrativeText",
"metadata": {
"filename": "slack-ingest-download/C052BGT7718.txt"
}
},
{
"element_id": "574858a3daf89b7ef96e41910a4ae56d",
"text": "<@U04ST78RXU3> has joined the channel",
"type": "NarrativeText",
"metadata": {
"filename": "slack-ingest-download/C052BGT7718.txt"
}
}
]

View File

@ -0,0 +1,40 @@
#!/usr/bin/env bash
SCRIPT_DIR=$(cd -- "$(dirname -- "${BASH_SOURCE[0]}")" &>/dev/null && pwd)
cd "$SCRIPT_DIR"/.. || exit 1
if [ -z "$SLACK_TOKEN" ]; then
echo "Skipping Slack ingest test because the SLACK_TOKEN env var is not set."
exit 0
fi
PYTHONPATH=. ./unstructured/ingest/main.py \
--slack-channels C052BGT7718 \
--slack-token "${SLACK_TOKEN}" \
--download-dir slack-ingest-download \
--structured-output-dir slack-ingest-output \
--start-date 2023-04-01 \
--end-date 2023-04-08T12:00:00-08:00
OVERWRITE_FIXTURES=${OVERWRITE_FIXTURES:-false}
# to update ingest test fixtures, run scripts/ingest-test-fixtures-update.sh on x86_64
if [[ "$OVERWRITE_FIXTURES" != "false" ]]; then
cp slack-ingest-output/* test_unstructured_ingest/expected-structured-output/slack-ingest-channel/
elif ! diff -ru slack-ingest-output test_unstructured_ingest/expected-structured-output/slack-ingest-channel; then
echo
echo "There are differences from the previously checked-in structured outputs."
echo
echo "If these differences are acceptable, overwrite by the fixtures by setting the env var:"
echo
echo " export OVERWRITE_FIXTURES=true"
echo
echo "and then rerun this script."
echo
echo "NOTE: You'll likely just want to run scripts/ingest-test-fixtures-update.sh on x86_64 hardware"
echo "to update fixtures for CI."
echo
exit 1
fi

View File

@ -16,4 +16,5 @@ export OMP_THREAD_LIMIT=1
./test_unstructured_ingest/test-ingest-biomed-api.sh
./test_unstructured_ingest/test-ingest-biomed-path.sh
./test_unstructured_ingest/test-ingest-local.sh
./test_unstructured_ingest/test-ingest-slack.sh
./test_unstructured_ingest/test-ingest-against-api.sh

View File

@ -1 +1 @@
__version__ = "0.5.13-dev5" # pragma: no cover
__version__ = "0.5.13-dev6" # pragma: no cover

View File

@ -2,7 +2,6 @@ import json
import os
import urllib.request
from dataclasses import dataclass
from datetime import datetime
from ftplib import FTP, error_perm
from pathlib import Path
from typing import List, Optional, Union
@ -16,6 +15,9 @@ from unstructured.ingest.interfaces import (
BaseIngestDoc,
)
from unstructured.ingest.logger import logger
from unstructured.utils import (
validate_date_args,
)
DOMAIN = "ftp.ncbi.nlm.nih.gov"
FTP_DOMAIN = f"ftp://{DOMAIN}"
@ -56,36 +58,14 @@ class SimpleBiomedConfig(BaseConnectorConfig):
fields_include: str = "element_id,text,type,metadata"
flatten_metadata: bool = False
def _validate_date_args(self, date):
date_formats = ["%Y-%m-%d", "%Y-%m-%d+%H:%M:%S"]
valid = False
if date:
date = date.replace(" ", "+").replace("%20", "+")
for format in date_formats:
try:
datetime.strptime(date, format)
valid = True
break
except ValueError:
pass
if not valid:
raise ValueError(
f"The from argument {date} does not satisfy the format: "
"YYYY-MM-DD or YYYY-MM-DD HH:MM:SS",
)
return valid
def validate_api_inputs(self):
valid = False
if self.from_:
valid = self._validate_date_args(self.from_)
valid = validate_date_args(self.from_)
if self.until:
valid = self._validate_date_args(self.until)
valid = validate_date_args(self.until)
return valid

View File

@ -0,0 +1,224 @@
import json
import os
from dataclasses import dataclass
from datetime import datetime
from pathlib import Path
from typing import List, Optional
from slack_sdk import WebClient
from slack_sdk.errors import SlackApiError
from unstructured.ingest.interfaces import (
BaseConnector,
BaseConnectorConfig,
BaseIngestDoc,
)
from unstructured.ingest.logger import logger
from unstructured.utils import (
requires_dependencies,
validate_date_args,
)
DATE_FORMATS = ("%Y-%m-%d", "%Y-%m-%dT%H:%M:%S", "%Y-%m-%dT%H:%M:%S%z")
@dataclass
class SimpleSlackConfig(BaseConnectorConfig):
"""Connector config to process all messages by channel id's."""
channels: List[str]
token: str
oldest: str
latest: str
# Standard Connector options
download_dir: str
output_dir: str
re_download: bool = False
preserve_downloads: bool = False
download_only: bool = False
metadata_include: Optional[str] = None
metadata_exclude: Optional[str] = None
partition_by_api: bool = False
partition_endpoint: str = "https://api.unstructured.io/general/v0/general"
fields_include: str = "element_id,text,type,metadata"
flatten_metadata: bool = False
verbose: bool = False
def validate_inputs(self):
oldest_valid = True
latest_valid = True
if self.oldest:
oldest_valid = validate_date_args(self.oldest)
if self.latest:
latest_valid = validate_date_args(self.latest)
return oldest_valid, latest_valid
def __post_init__(self):
oldest_valid, latest_valid = self.validate_inputs()
if not oldest_valid and not latest_valid:
raise ValueError(
"Start and/or End dates are not valid. ",
)
@staticmethod
def parse_channels(channel_str: str) -> List[str]:
"""Parses a comma separated list of channels into a list."""
return [x.strip() for x in channel_str.split(",")]
@dataclass
class SlackIngestDoc(BaseIngestDoc):
"""Class encapsulating fetching a doc and writing processed results (but not
doing the processing!).
Also includes a cleanup method. When things go wrong and the cleanup
method is not called, the file is left behind on the filesystem to assist debugging.
"""
config: SimpleSlackConfig
channel: str
token: str
oldest: str
latest: str
# NOTE(crag): probably doesn't matter, but intentionally not defining tmp_download_file
# __post_init__ for multiprocessing simplicity (no Path objects in initially
# instantiated object)
def _tmp_download_file(self):
channel_file = self.channel + ".txt"
return Path(self.config.download_dir) / channel_file
def _output_filename(self):
output_file = self.channel + ".json"
return Path(self.config.output_dir) / output_file
def has_output(self):
"""Determine if structured output for this doc already exists."""
return self._output_filename().is_file() and os.path.getsize(self._output_filename())
def _create_full_tmp_dir_path(self):
self._tmp_download_file().parent.mkdir(parents=True, exist_ok=True)
@requires_dependencies(dependencies=["slack_sdk"], extras="slack")
def get_file(self):
"""Fetches the data from a slack channel and stores it locally."""
self._create_full_tmp_dir_path()
if (
not self.config.re_download
and self._tmp_download_file().is_file()
and os.path.getsize(self._tmp_download_file())
):
if self.config.verbose:
logger.debug(f"File exists: {self._tmp_download_file()}, skipping download")
return
if self.config.verbose:
logger.debug(f"fetching channel {self.channel} - PID: {os.getpid()}")
messages = []
self.client = WebClient(token=self.token)
try:
oldest = "0"
latest = "0"
if self.oldest:
oldest = self.convert_datetime(self.oldest)
if self.latest:
latest = self.convert_datetime(self.latest)
result = self.client.conversations_history(
channel=self.channel,
oldest=oldest,
latest=latest,
)
messages.extend(result["messages"])
while result["has_more"]:
result = self.client.conversations_history(
channel=self.channel,
oldest=oldest,
latest=latest,
cursor=result["response_metadata"]["next_cursor"],
)
messages.extend(result["messages"])
except SlackApiError as e:
logger.error(f"Error: {e}")
with open(self._tmp_download_file(), "w") as channel_file:
for message in messages:
channel_file.write(message["text"] + "\n")
def write_result(self):
"""Write the structured json result for this doc. result must be json serializable."""
output_filename = self._output_filename()
output_filename.parent.mkdir(parents=True, exist_ok=True)
with open(output_filename, "w") as output_f:
output_f.write(json.dumps(self.isd_elems_no_filename, ensure_ascii=False, indent=2))
logger.info(f"Wrote {output_filename}")
def convert_datetime(self, date_time):
for format in DATE_FORMATS:
try:
return datetime.strptime(date_time, format).timestamp()
except ValueError:
pass
@property
def filename(self):
"""The filename of the file created from a slack channel"""
return self._tmp_download_file()
def cleanup_file(self):
"""Removes the local copy the file after successful processing."""
if not self.config.preserve_downloads:
if self.config.verbose:
logger.info(f"cleaning up channel {self.channel}")
os.unlink(self._tmp_download_file())
@requires_dependencies(dependencies=["slack_sdk"], extras="slack")
class SlackConnector(BaseConnector):
"""Objects of this class support fetching document(s) from"""
def __init__(self, config: SimpleSlackConfig):
self.config = config
self.cleanup_files = not config.preserve_downloads
def cleanup(self, cur_dir=None):
"""cleanup linginering empty sub-dirs, but leave remaining files
(and their paths) in tact as that indicates they were not processed"""
if not self.cleanup_files:
return
if cur_dir is None:
cur_dir = self.config.download_dir
sub_dirs = os.listdir(cur_dir)
os.chdir(cur_dir)
for sub_dir in sub_dirs:
# don't traverse symlinks, not that there every should be any
if os.path.isdir(sub_dir) and not os.path.islink(sub_dir):
self.cleanup(sub_dir)
os.chdir("..")
if len(os.listdir(cur_dir)) == 0:
os.rmdir(cur_dir)
def initialize(self):
"""Verify that can get metadata for an object, validates connections info."""
pass
def get_ingest_docs(self):
return [
SlackIngestDoc(
self.config,
channel,
self.config.token,
self.config.oldest,
self.config.latest,
)
for channel in self.config.channels
]

View File

@ -321,6 +321,29 @@ class MainProcess:
"the default download ~/.cache/... location in case --download-dir is not specified and "
"skip processing them through unstructured.",
)
@click.option(
"--slack-channels",
default=None,
help="Comma separated list of Slack channel IDs to pull messages from, "
"can be a public or private channel",
)
@click.option(
"--slack-token",
default=None,
help="Bot token used to access Slack API, must have channels:history " "scope for the bot user",
)
@click.option(
"--start-date",
default=None,
help="Start date/time in formats YYYY-MM-DD or YYYY-MM-DDTHH:MM:SS or "
"YYYY-MM-DD+HH:MM:SS or YYYY-MM-DDTHH:MM:SStz",
)
@click.option(
"--end-date",
default=None,
help="End date/time in formats YYYY-MM-DD or YYYY-MM-DDTHH:MM:SS or "
"YYYY-MM-DD+HH:MM:SS or YYYY-MM-DDTHH:MM:SStz",
)
@click.option(
"--download-dir",
help="Where files are downloaded to, defaults to `$HOME/.cache/unstructured/ingest/<SHA256>`.",
@ -380,6 +403,10 @@ def main(
reddit_search_query,
reddit_num_posts,
re_download,
slack_channels,
slack_token,
start_date,
end_date,
download_dir,
preserve_downloads,
structured_output_dir,
@ -545,6 +572,7 @@ def main(
" and `az`.",
UserWarning,
)
from unstructured.ingest.connector.fsspec import (
FsspecConnector,
SimpleFsspecConfig,
@ -646,6 +674,26 @@ def main(
download_only=download_only,
),
)
elif slack_channels:
from unstructured.ingest.connector.slack import (
SimpleSlackConfig,
SlackConnector,
)
doc_connector = SlackConnector( # type: ignore
config=SimpleSlackConfig(
channels=SimpleSlackConfig.parse_channels(slack_channels),
token=slack_token,
oldest=start_date,
latest=end_date,
# defaults params:
download_dir=download_dir,
preserve_downloads=preserve_downloads,
output_dir=structured_output_dir,
re_download=re_download,
verbose=verbose,
),
)
elif wikipedia_page_title:
doc_connector = WikipediaConnector( # type: ignore
config=SimpleWikipediaConfig(

View File

@ -1,8 +1,11 @@
import importlib
import json
from datetime import datetime
from functools import wraps
from typing import Dict, List, Optional, Union
DATE_FORMATS = ("%Y-%m-%d", "%Y-%m-%dT%H:%M:%S", "%Y-%m-%d+%H:%M:%S", "%Y-%m-%dT%H:%M:%S%z")
def save_as_jsonl(data: List[Dict], filename: str) -> None:
with open(filename, "w+") as output_file:
@ -50,3 +53,21 @@ def dependency_exists(dependency):
except ImportError:
return False
return True
# Copied from unstructured/ingest/connector/biomed.py
def validate_date_args(date: Optional[str] = None):
if not date:
raise ValueError("The argument date is None.")
for format in DATE_FORMATS:
try:
datetime.strptime(date, format)
return True
except ValueError:
pass
raise ValueError(
f"The argument {date} does not satisfy the format: "
"YYYY-MM-DD or YYYY-MM-DDTHH:MM:SS or YYYY-MM-DD+HH:MM:SS or YYYY-MM-DDTHH:MM:SStz",
)