feat: detect PGP encrypted content in partition_email and partition_msg (#1205)

### Summary

Closes #1018. Enables `partition_email` and `partition_msg` to detect if
an email has PGP encrypted content. Based on the specification in [RFC
2015](https://www.ietf.org/rfc/rfc2015.txt). The test emails are based
on the example email in the spec. If PGP detected content is detected, a
warning is emitted and an empty set of lists is returned.

### Testing

```python
from unstructured.partition_email import partition_email

filename = "example-docs/eml/fake-encrypted.eml"
partition_email(filename=filename)
```

```python
from unstructured.partition_msg import partition_msg

filename = "example-docs/fake-encrypted.msg"
partition_msgl(filename=filename)
```
This commit is contained in:
Matt Robinson 2023-08-25 20:09:25 -04:00 committed by GitHub
parent 5872fa23c3
commit 07f76275f1
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
9 changed files with 95 additions and 7 deletions

View File

@ -1,8 +1,11 @@
## 0.10.6-dev2
## 0.10.6-dev3
### Enhancements
* Add threaded Slack conversations into Slack connector output
* Enable `partition_email` and `partition_msg` to detect if an email is PGP encryped. If
and email is PGP encryped, the functions will return an empy list of elements and
emit a warning about the encrypted content.
* Add threaded Slack conversations into Slack connector output
* Add functionality to sort elements using `xy-cut` sorting approach in `partition_pdf` for `hi_res` and `fast` strategies
### Features

View File

@ -265,6 +265,9 @@ workflow looks like:
filename=filename, process_attachments=True, attachment_partitioner=partition
)
If the content of an email is PGP encrypted, ``partition_email`` will return an empty
list of elements and emit a warning indicated the email is encrypted.
For more information about the ``partition_email`` brick, you can check the `source code here <https://github.com/Unstructured-IO/unstructured/blob/a583d47b841bdd426b9058b7c34f6aa3ed8de152/unstructured/partition/email.py>`_.
@ -457,6 +460,9 @@ workflow looks like:
filename=filename, process_attachments=True, attachment_partitioner=partition
)
If the content of an email is PGP encrypted, ``partition_msg`` will return an empty
list of elements and emit a warning indicated the email is encrypted.
For more information about the ``partition_msg`` brick, you can check the `source code here <https://github.com/Unstructured-IO/unstructured/blob/a583d47b841bdd426b9058b7c34f6aa3ed8de152/unstructured/partition/msg.py>`_.

View File

@ -0,0 +1,26 @@
From: Michael Elkins <elkins@aero.org>
To: Michael Elkins <elkins@aero.org>
Mime-Version: 1.0
Content-Type: multipart/encrypted; boundary=foo;
protocol="application/pgp-encrypted"
--foo
Content-Type: application/pgp-encrypted
Version: 1
--foo
Content-Type: application/octet-stream
-----BEGIN PGP MESSAGE-----
Version: 2.6.2
hIwDY32hYGCE8MkBA/wOu7d45aUxF4Q0RKJprD3v5Z9K1YcRJ2fve87lMlDlx4Oj
eW4GDdBfLbJE7VUpp13N19GL8e/AqbyyjHH4aS0YoTk10QQ9nnRvjY8nZL3MPXSZ
g9VGQxFeGqzykzmykU6A26MSMexR4ApeeON6xzZWfo+0yOqAq6lb46wsvldZ96YA
AABH78hyX7YX4uT1tNCWEIIBoqqvCeIMpp7UQ2IzBrXg6GtukS8NxbukLeamqVW3
1yt21DYOjuLzcMNe/JNsD9vDVCvOOG3OCi8=
=zzaA
-----END PGP MESSAGE-----
--foo--

Binary file not shown.

View File

@ -64,6 +64,7 @@ def test_partition_msg_from_filename_with_metadata_filename():
class MockMsOxMessage:
def __init__(self, filename):
self.body = "Here is an email with plain text."
self.header_dict = {"Content-Type": "text/plain"}
def test_partition_msg_from_filename_with_text_content(monkeypatch):
@ -240,3 +241,14 @@ def test_partition_msg_custom_metadata_date(
)
assert elements[0].metadata.last_modified == expected_last_modification_date
def test_partition_msg_with_pgp_encrypted_message(
caplog,
filename="example-docs/fake-encrypted.msg",
):
elements = partition_msg(filename=filename)
assert elements == []
assert "WARNING" in caplog.text
assert "Encrypted email detected" in caplog.text

View File

@ -549,3 +549,14 @@ def test_partition_email_odd_attachment_filename(
)
assert elements[1].metadata.filename == "odd=file=name.txt"
def test_partition_email_with_pgp_encrypted_message(
caplog,
filename="example-docs/eml/fake-encrypted.eml",
):
elements = partition_email(filename=filename)
assert elements == []
assert "WARNING" in caplog.text
assert "Encrypted email detected" in caplog.text

View File

@ -1 +1 @@
__version__ = "0.10.6-dev2" # pragma: no cover
__version__ = "0.10.6-dev3" # pragma: no cover

View File

@ -14,6 +14,7 @@ from unstructured.file_utils.encoding import (
read_txt_file,
validate_encoding,
)
from unstructured.logger import logger
from unstructured.partition.common import (
convert_to_bytes,
exactly_one,
@ -49,7 +50,6 @@ from unstructured.documents.email_elements import (
Subject,
)
from unstructured.file_utils.filetype import FileType, add_metadata_with_filetype
from unstructured.logger import logger
from unstructured.nlp.patterns import EMAIL_DATETIMETZ_PATTERN_RE
from unstructured.partition.html import partition_html
from unstructured.partition.text import partition_text
@ -322,6 +322,7 @@ def partition_email(
if not encoding:
encoding = detected_encoding
is_encrypted = False
content_map: Dict[str, str] = {}
for part in msg.walk():
# NOTE(robinson) - content dispostiion is None for the content of the email itself.
@ -329,11 +330,26 @@ def partition_email(
if part.get_content_disposition() is not None:
continue
content_type = part.get_content_type()
# NOTE(robinson) - Per RFC 2015, the content type for emails with PGP encrypted
# content is multipart/encrypted
# ref: https://www.ietf.org/rfc/rfc2015.txt
if content_type.endswith("encrypted"):
is_encrypted = True
content_map[content_type] = part.get_payload()
content = content_map.get(content_source, "")
if not content:
elements = []
elements: List[Element] = []
if is_encrypted:
logger.warning(
"Encrypted email detected. Partition function will return an empty list.",
)
elif not content:
pass
elif content_source == "text/html":
# NOTE(robinson) - In the .eml files, the HTML content gets stored in a format that

View File

@ -6,6 +6,7 @@ import msg_parser
from unstructured.documents.elements import Element, ElementMetadata, process_metadata
from unstructured.file_utils.filetype import FileType, add_metadata_with_filetype
from unstructured.logger import logger
from unstructured.partition.common import exactly_one
from unstructured.partition.email import convert_to_iso_8601
from unstructured.partition.html import partition_html
@ -60,8 +61,21 @@ def partition_msg(
tmp.close()
msg_obj = msg_parser.MsOxMessage(tmp.name)
# NOTE(robinson) - Per RFC 2015, the content type for emails with PGP encrypted
# content is multipart/encrypted
# ref: https://www.ietf.org/rfc/rfc2015.txt
content_type = msg_obj.header_dict.get("Content-Type", "")
is_encrypted = "encrypted" in content_type
text = msg_obj.body
if "<html>" in text or "</div>" in text:
elements: List[Element] = []
if is_encrypted:
logger.warning(
"Encrypted email detected. Partition function will return an empty list.",
)
elif text is None:
pass
elif "<html>" in text or "</div>" in text:
elements = partition_html(text=text)
else:
elements = partition_text(