feat: adds data source properties to delta table connector. (#1464)

This commit is contained in:
rvztz 2023-09-27 18:46:01 -06:00 committed by GitHub
parent fd79c5262c
commit 2e01c49d90
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
13 changed files with 38 additions and 23 deletions

View File

@ -1,8 +1,8 @@
## 0.10.17-dev14
## 0.10.17-dev15
### Enhancements
* **Adds data source properties to SharePoint, Outlook, Onedrive, Reddit, and Slack connectors** These properties (date_created, date_modified, version, source_url, record_locator) are written to element metadata during ingest, mapping elements to information about the document source from which they derive. This functionality enables downstream applications to reveal source document applications, e.g. a link to a GDrive doc, Salesforce record, etc.
* **Adds data source properties to SharePoint, Outlook, Onedrive, Reddit, Slack, and DeltaTable connectors** These properties (date_created, date_modified, version, source_url, record_locator) are written to element metadata during ingest, mapping elements to information about the document source from which they derive. This functionality enables downstream applications to reveal source document applications, e.g. a link to a GDrive doc, Salesforce record, etc.
* **Add functionality to save embedded images in PDF's separately as images** This allows users to save embedded images in PDF's separately as images, given some directory path. The saved image path is written to the metadata for the Image element. Downstream applications may benefit by providing users with image links from relevant "hits."
* **Azure Cognite Search destination connector** New Azure Cognitive Search destination connector added to ingest CLI. Users may now use `unstructured-ingest` to write partitioned data from over 20 data sources (so far) to an Azure Cognitive Search index.
* **Improves salesforce partitioning** Partitions Salesforce data as xlm instead of text for improved detail and flexibility. Partitions htmlbody instead of textbody for Salesforce emails. Importance: Allows all Salesforce fields to be ingested and gives Salesforce emails more detailed partitioning.

View File

@ -5,6 +5,7 @@
"metadata": {
"data_source": {
"url": "s3://utic-dev-tech-fixtures/sample-delta-lake-data/deltatable/0-9d594ee0-ad36-4e7e-a6be-f53975fe3d10-0.parquet",
"version": 264934223616864047145159629306912568989,
"date_created": "2023-08-16 13:30:38.644000",
"date_modified": "2023-08-16 13:30:38.604000"
},

View File

@ -5,6 +5,7 @@
"metadata": {
"data_source": {
"url": "s3://utic-dev-tech-fixtures/sample-delta-lake-data/deltatable/0-9d594ee0-ad36-4e7e-a6be-f53975fe3d10-1.parquet",
"version": 139732878514171884135017505553329458078,
"date_created": "2023-08-16 13:30:38.644000",
"date_modified": "2023-08-16 13:30:38.629000"
},

View File

@ -5,6 +5,7 @@
"metadata": {
"data_source": {
"url": "s3://utic-dev-tech-fixtures/sample-delta-lake-data/deltatable/0-9d594ee0-ad36-4e7e-a6be-f53975fe3d10-2.parquet",
"version": 94569544647555135566266174719335103474,
"date_created": "2023-08-16 13:30:38.644000",
"date_modified": "2023-08-16 13:30:38.634000"
},

View File

@ -5,6 +5,7 @@
"metadata": {
"data_source": {
"url": "s3://utic-dev-tech-fixtures/sample-delta-lake-data/deltatable/0-9d594ee0-ad36-4e7e-a6be-f53975fe3d10-3.parquet",
"version": 153924277850028657610430472976884166368,
"date_created": "2023-08-16 13:30:38.644000",
"date_modified": "2023-08-16 13:30:38.609000"
},

View File

@ -5,6 +5,7 @@
"metadata": {
"data_source": {
"url": "s3://utic-dev-tech-fixtures/sample-delta-lake-data/deltatable/0-9d594ee0-ad36-4e7e-a6be-f53975fe3d10-4.parquet",
"version": 106461216032689499284003671440554259965,
"date_created": "2023-08-16 13:30:38.644000",
"date_modified": "2023-08-16 13:30:38.599000"
},

View File

@ -5,6 +5,7 @@
"metadata": {
"data_source": {
"url": "s3://utic-dev-tech-fixtures/sample-delta-lake-data/deltatable/0-9d594ee0-ad36-4e7e-a6be-f53975fe3d10-5.parquet",
"version": 164150003651878262139646734756859067992,
"date_created": "2023-08-16 13:30:38.644000",
"date_modified": "2023-08-16 13:30:38.614000"
},

View File

@ -5,6 +5,7 @@
"metadata": {
"data_source": {
"url": "s3://utic-dev-tech-fixtures/sample-delta-lake-data/deltatable/0-9d594ee0-ad36-4e7e-a6be-f53975fe3d10-6.parquet",
"version": 117019847084687446803154205344125897829,
"date_created": "2023-08-16 13:30:38.644000",
"date_modified": "2023-08-16 13:30:38.619000"
},

View File

@ -5,6 +5,7 @@
"metadata": {
"data_source": {
"url": "s3://utic-dev-tech-fixtures/sample-delta-lake-data/deltatable/0-9d594ee0-ad36-4e7e-a6be-f53975fe3d10-7.parquet",
"version": 93578343538662480706683120160579695806,
"date_created": "2023-08-16 13:30:38.644000",
"date_modified": "2023-08-16 13:30:38.624000"
},

View File

@ -5,6 +5,7 @@
"metadata": {
"data_source": {
"url": "s3://utic-dev-tech-fixtures/sample-delta-lake-data/deltatable/0-9d594ee0-ad36-4e7e-a6be-f53975fe3d10-8.parquet",
"version": 329407810704817028643559273505069222621,
"date_created": "2023-08-16 13:30:38.644000",
"date_modified": "2023-08-16 13:30:38.644000"
},

View File

@ -5,6 +5,7 @@
"metadata": {
"data_source": {
"url": "s3://utic-dev-tech-fixtures/sample-delta-lake-data/deltatable/0-9d594ee0-ad36-4e7e-a6be-f53975fe3d10-9.parquet",
"version": 127086160869624650753647884727730407942,
"date_created": "2023-08-16 13:30:38.644000",
"date_modified": "2023-08-16 13:30:38.639000"
},

View File

@ -1 +1 @@
__version__ = "0.10.17-dev14" # pragma: no cover
__version__ = "0.10.17-dev15" # pragma: no cover

View File

@ -15,6 +15,7 @@ from unstructured.ingest.interfaces import (
BaseSourceConnector,
IngestDocCleanupMixin,
SourceConnectorCleanupMixin,
SourceMetadata,
WriteConfig,
)
from unstructured.ingest.logger import logger
@ -50,26 +51,10 @@ class DeltaTableIngestDoc(IngestDocCleanupMixin, BaseIngestDoc):
basename = os.path.basename(self.uri)
return os.path.splitext(basename)[0]
@property
def source_url(self) -> t.Optional[str]:
"""The url of the source document."""
return self.uri
@property
def date_created(self) -> t.Optional[str]:
"""This is the creation time of the table itself, not the file or specific record"""
# TODO get creation time of file/record
return self.created_at
@property
def filename(self):
return (Path(self.read_config.download_dir) / f"{self.uri_filename()}.csv").resolve()
@property
def date_modified(self) -> t.Optional[str]:
"""The date the document was last modified on the source system."""
return self.modified_date
@property
def _output_filename(self):
"""Create filename document id combined with a hash of the query to uniquely identify
@ -80,11 +65,8 @@ class DeltaTableIngestDoc(IngestDocCleanupMixin, BaseIngestDoc):
self.filename.parent.mkdir(parents=True, exist_ok=True)
self._output_filename.parent.mkdir(parents=True, exist_ok=True)
@SourceConnectionError.wrap
@BaseIngestDoc.skip_if_file_exists
@requires_dependencies(["fsspec"], extras="delta-table")
def get_file(self):
import pyarrow.parquet as pq
def _get_fs_from_uri(self):
from fsspec.core import url_to_fs
try:
@ -94,6 +76,29 @@ class DeltaTableIngestDoc(IngestDocCleanupMixin, BaseIngestDoc):
f"uri {self.uri} may be associated with a filesystem that "
f"requires additional dependencies: {error}",
)
return fs
def update_source_metadata(self, **kwargs):
fs = kwargs.get("fs", self._get_fs_from_uri())
version = (
fs.checksum(self.uri) if fs.protocol != "gs" else fs.info(self.uri).get("etag", "")
)
file_exists = fs.exists(self.uri)
self.source_metadata = SourceMetadata(
date_created=self.created_at,
date_modified=self.modified_date,
version=version,
source_url=self.uri,
exists=file_exists,
)
@SourceConnectionError.wrap
@BaseIngestDoc.skip_if_file_exists
def get_file(self):
import pyarrow.parquet as pq
fs = self._get_fs_from_uri()
self.update_source_metadata(fs=fs)
logger.info(f"using a {fs} filesystem to collect table data")
self._create_full_tmp_dir_path()
logger.debug(f"Fetching {self} - PID: {os.getpid()}")