feat: Implement LabelBox staging brick (#26)

* Implement stage_for_label_box function

* Add unit tests for stage_for_label_box function

* Update docs with description and example for stage_for_label_box function

* Bump version and update CHANGELOG.md

* Fix linting issues and implement suggested changes

* Update stage_for_label_box docs with a note for uploading files to cloud providers
This commit is contained in:
asymness 2022-10-11 19:15:25 +05:00 committed by GitHub
parent 546865fd64
commit ec5be8e8b0
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
5 changed files with 306 additions and 2 deletions

View File

@ -1,5 +1,6 @@
## 0.2.1-dev5
## 0.2.1-dev6
* Added staging brick for LabelBox.
* Added ability to upload LabelStudio predictions
* Added utility function for JSONL reading and writing
* Added staging brick for CSV format for Prodigy

View File

@ -562,3 +562,70 @@ Examples:
# The resulting CSV file is ready to be used with Prodigy
with open("prodigy.csv", "w") as csv_file:
csv_file.write(prodigy_csv_data)
``stage_for_label_box``
--------------------------
Formats outputs for use with `LabelBox <https://docs.labelbox.com/docs/overview>`_. LabelBox accepts cloud-hosted data
and does not support importing text directly. The ``stage_for_label_box`` does the following:
* Stages the data files in the ``output_directory`` specified in function arguments to be uploaded to a cloud storage service.
* Returns a config of type ``List[Dict[str, Any]]`` that can be written to a ``json`` file and imported into LabelBox.
**Note:** ``stage_for_label_box`` does not upload the data to remote storage such as S3. Users can upload the data to S3
using ``aws s3 sync ${output_directory} ${url_prefix}`` after running the ``stage_for_label_box`` staging brick.
Examples:
The following example demonstrates generating a ``config.json`` file that can be used with LabelBox and uploading the staged data
files to an S3 bucket.
.. code:: python
import os
import json
from unstructured.documents.elements import Title, NarrativeText
from unstructured.staging.label_box import stage_for_label_box
# The S3 Bucket name where data files should be uploaded.
S3_BUCKET_NAME = "labelbox-staging-bucket"
# The S3 key prefix (I.e. directory) where data files should be stored.
S3_BUCKET_KEY_PREFIX = "data/"
# The URL prefix where the data files will be accessed.
S3_URL_PREFIX = f"https://{S3_BUCKET_NAME}.s3.amazonaws.com/{S3_BUCKET_KEY_PREFIX}"
# The local output directory where the data files will be staged for uploading to a Cloud Storage service.
LOCAL_OUTPUT_DIRECTORY = "/tmp/labelbox-staging"
elements = [Title(text="Title"), NarrativeText(text="Narrative")]
labelbox_config = stage_for_label_box(
elements,
output_directory=LOCAL_OUTPUT_DIRECTORY,
url_prefix=S3_URL_PREFIX,
external_ids=["id1", "id2"],
attachments=[[{"type": "RAW_TEXT", "value": "Title description"}], [{"type": "RAW_TEXT", "value": "Narrative Description"}]],
create_directory=True,
)
# The resulting JSON config file is ready to be used with LabelBox.
with open("config.json", "w+") as labelbox_config_file:
json.dump(labelbox_config, labelbox_config_file, indent=4)
# Upload staged data files to S3 from local output directory.
def upload_staged_files():
import boto3
s3 = boto3.client("s3")
for filename in os.listdir(LOCAL_OUTPUT_DIRECTORY):
filepath = os.path.join(LOCAL_OUTPUT_DIRECTORY, filename)
upload_key = os.path.join(S3_BUCKET_KEY_PREFIX, filename)
s3.upload_file(filepath, Bucket=S3_BUCKET_NAME, Key=upload_key)
upload_staged_files()

View File

@ -0,0 +1,135 @@
import os
import pytest
import unstructured.staging.label_box as label_box
from unstructured.documents.elements import Title, NarrativeText
@pytest.fixture
def elements():
return [Title(text="Title 1"), NarrativeText(text="Narrative 1")]
@pytest.fixture
def output_directory(tmp_path):
return str(tmp_path)
@pytest.fixture
def nonexistent_output_directory(tmp_path):
return os.path.join(str(tmp_path), "nonexistent_dir")
@pytest.fixture
def url_prefix():
return "https://storage.googleapis.com/labelbox-sample-datasets/nlp"
@pytest.mark.parametrize(
"attachments, raises_error",
[
(
[
{"type": "RAW_TEXT", "value": "Description Text"},
{"type": "IMAGE", "value": "Image label", "ignored_value": 123},
],
False,
),
([{"type": "INVALID_TYPE", "value": "Description Text"}], True),
([{"type": "RAW_TEXT", "value": 1}], True),
([{"type": "RAW_TEXT"}], True),
([{"value": "My text label"}], True),
],
)
def test_validate_attachments(attachments, raises_error):
if raises_error:
with pytest.raises(ValueError):
label_box._validate_attachments(attachments, 0)
else:
label_box._validate_attachments(attachments, 0)
attachment = {"type": "RAW_TEXT", "value": "Text description."}
@pytest.mark.parametrize(
(
"external_ids, attachments, output_directory_fixture, create_directory, "
"raises, exception_class"
),
[
(None, None, "output_directory", True, False, None),
(["id1", "id2"], None, "output_directory", True, False, None),
(["id1"], None, "output_directory", True, True, ValueError),
(None, [[attachment], [attachment]], "output_directory", True, False, None),
(None, [[attachment]], "output_directory", True, True, ValueError),
(["id1", "id2"], [[attachment] * 2, [attachment]], "output_directory", True, False, None),
(
["id1", "id2"],
[[attachment] * 2, [attachment]],
"nonexistent_output_directory",
True,
False,
None,
),
(
["id1", "id2"],
[[attachment] * 2, [attachment]],
"nonexistent_output_directory",
False,
True,
FileNotFoundError,
),
],
)
def test_stage_for_label_box(
elements,
url_prefix,
external_ids,
attachments,
output_directory_fixture,
create_directory,
raises,
exception_class,
request,
):
output_directory = request.getfixturevalue(output_directory_fixture)
if raises:
with pytest.raises(exception_class):
label_box.stage_for_label_box(
elements,
output_directory,
url_prefix,
external_ids=external_ids,
attachments=attachments,
create_directory=create_directory,
)
else:
config = label_box.stage_for_label_box(
elements,
output_directory,
url_prefix,
external_ids=external_ids,
attachments=attachments,
create_directory=create_directory,
)
assert len(config) == len(elements)
for index, (element_config, element) in enumerate(zip(config, elements)):
print(element_config)
if external_ids:
assert element_config["externalId"] == external_ids[index]
else:
assert element_config["externalId"] == element.id
if attachments:
assert element_config["attachments"] == [
{"type": attachment["type"], "value": attachment["value"]}
for attachment in attachments[index]
]
assert element_config["data"].startswith(url_prefix)
assert element_config["data"].endswith(f'{element_config["externalId"]}.txt')
output_filepath = os.path.join(output_directory, f'{element_config["externalId"]}.txt')
with open(output_filepath, "r") as data_file:
assert data_file.read().strip() == element.text.strip()

View File

@ -1 +1 @@
__version__ = "0.2.1-dev5" # pragma: no cover
__version__ = "0.2.1-dev6" # pragma: no cover

View File

@ -0,0 +1,101 @@
import os
from typing import Any, Dict, List, Optional, Union, Sequence
from unstructured.documents.elements import Text, NoID
VALID_ATTACHMENT_TYPES: List[str] = ["IMAGE", "VIDEO", "RAW_TEXT", "TEXT_URL", "HTML"]
def _validate_attachments(attachment_list: List[Dict[str, str]], element_index: int):
"""
Validates attachment list specified for an element.
Raises a ValueError with error message if the attachment list are not valid.
"""
for attachment_index, attachment in enumerate(attachment_list):
error_message_prefix = (
f"Error at index {attachment_index} of attachments parameter "
f"for element at index {element_index}."
)
try:
attachment_type = attachment["type"]
attachment_value = attachment["value"]
except KeyError as e:
raise ValueError(f" Missing required key: {e.args[0]}")
if (
not isinstance(attachment_type, str)
or attachment_type.upper() not in VALID_ATTACHMENT_TYPES
):
raise ValueError(
f"{error_message_prefix}. Invalid value specified for attachment.type. "
f"Must be one of: {', '.join(VALID_ATTACHMENT_TYPES)}"
)
if not isinstance(attachment_value, str):
raise ValueError(
f"{error_message_prefix}. Invalid value specified for attachment.value. "
"Must be of type string."
)
def stage_for_label_box(
elements: List[Text],
output_directory: str,
url_prefix: str,
external_ids: Optional[List[str]] = None,
attachments: Optional[List[List[Dict[str, str]]]] = None,
create_directory: bool = False,
) -> List[Dict[str, Any]]:
"""
Stages documents to be uploaded to LabelBox and generates LabelBox configuration.
ref: https://docs.labelbox.com/reference/data-import-format-overview
"""
ids: Sequence[Union[str, NoID]]
if (external_ids is not None) and len(external_ids) != len(elements):
raise ValueError(
"The external_ids parameter must be a list and the length of external_ids parameter "
"must be the same as the length of elements parameter."
)
elif external_ids is None:
ids = [element.id for element in elements]
else:
ids = external_ids
if (attachments is not None) and len(attachments) != len(elements):
raise ValueError(
"The attachments parameter must be a list and the length of attachments parameter "
"must be the same as the length of elements parameter."
)
elif attachments is None:
attachments = [[] for _ in elements]
else:
for index, attachment_list in enumerate(attachments):
_validate_attachments(attachment_list, index)
if create_directory:
os.makedirs(output_directory, exist_ok=True)
else:
if not os.path.isdir(output_directory):
raise FileNotFoundError(output_directory)
config_data: List[Dict[str, str]] = []
for element, element_id, attachment_list in zip(elements, ids, attachments):
output_filename = f"{element_id}.txt"
data_url = "/".join([url_prefix.rstrip("/"), output_filename])
output_filepath = os.path.join(output_directory, output_filename)
with open(output_filepath, "w+") as output_text_file:
output_text_file.write(element.text)
element_config: Dict[str, Any] = {
"data": data_url,
"attachments": [
{"type": attachment["type"].upper(), "value": attachment["value"]}
for attachment in attachment_list
],
}
if isinstance(element_id, str):
element_config["externalId"] = element_id
config_data.append(element_config)
return config_data