mirror of
https://github.com/open-metadata/OpenMetadata.git
synced 2025-11-01 11:09:14 +00:00
MINOR: Add support for s3 unstructured files (#16936)
This commit is contained in:
parent
97a733b704
commit
afafb4af92
@ -176,6 +176,18 @@ class S3Source(StorageServiceSource):
|
||||
yield Either(right=container_request)
|
||||
self.register_record(container_request=container_request)
|
||||
|
||||
def get_size(self, bucket_name: str, file_path: str) -> Optional[float]:
|
||||
"""
|
||||
Method to get the size of the file
|
||||
"""
|
||||
try:
|
||||
file_obj = self.s3_client.head_object(Bucket=bucket_name, Key=file_path)
|
||||
return file_obj["ContentLength"]
|
||||
except Exception as exc:
|
||||
logger.debug(f"Failed to get size of file due to {exc}")
|
||||
logger.debug(traceback.format_exc())
|
||||
return None
|
||||
|
||||
def _generate_container_details(
|
||||
self,
|
||||
bucket_response: S3BucketResponse,
|
||||
@ -183,6 +195,31 @@ class S3Source(StorageServiceSource):
|
||||
parent: Optional[EntityReference] = None,
|
||||
) -> Optional[S3ContainerDetails]:
|
||||
bucket_name = bucket_response.name
|
||||
object_size = self.get_size(
|
||||
bucket_name=bucket_name,
|
||||
file_path=metadata_entry.dataPath.strip(KEY_SEPARATOR),
|
||||
)
|
||||
if not metadata_entry.structureFormat and object_size:
|
||||
prefix = f"{KEY_SEPARATOR}{metadata_entry.dataPath.strip(KEY_SEPARATOR)}"
|
||||
return S3ContainerDetails(
|
||||
name=metadata_entry.dataPath.strip(KEY_SEPARATOR),
|
||||
prefix=prefix,
|
||||
creation_date=bucket_response.creation_date.isoformat()
|
||||
if bucket_response.creation_date
|
||||
else None,
|
||||
file_formats=[],
|
||||
data_model=None,
|
||||
parent=parent,
|
||||
size=self.get_size(
|
||||
bucket_name=bucket_name,
|
||||
file_path=metadata_entry.dataPath.strip(KEY_SEPARATOR),
|
||||
),
|
||||
fullPath=self._get_full_path(bucket_name, prefix),
|
||||
sourceUrl=self._get_object_source_url(
|
||||
bucket_name=bucket_name,
|
||||
prefix=metadata_entry.dataPath.strip(KEY_SEPARATOR),
|
||||
),
|
||||
)
|
||||
sample_key = self._get_sample_file_path(
|
||||
bucket_name=bucket_name, metadata_entry=metadata_entry
|
||||
)
|
||||
|
||||
@ -52,21 +52,21 @@ class S3ContainerDetails(BaseModel):
|
||||
|
||||
name: str = Field(..., description="Bucket name")
|
||||
prefix: str = Field(..., description="Prefix for the container")
|
||||
number_of_objects: float = Field(
|
||||
...,
|
||||
number_of_objects: Optional[float] = Field(
|
||||
None,
|
||||
description="Total nr. of objects",
|
||||
)
|
||||
size: float = Field(
|
||||
...,
|
||||
size: Optional[float] = Field(
|
||||
None,
|
||||
description="Total size in bytes of all objects",
|
||||
title="Total size(bytes) of objects",
|
||||
)
|
||||
file_formats: Optional[List[FileFormat]] = Field(
|
||||
...,
|
||||
None,
|
||||
description="File formats",
|
||||
)
|
||||
data_model: Optional[ContainerDataModel] = Field(
|
||||
...,
|
||||
None,
|
||||
description="Data Model of the container",
|
||||
)
|
||||
creation_date: Optional[str] = Field(
|
||||
|
||||
@ -205,7 +205,7 @@ class StorageServiceSource(TopologyRunnerMixin, Source, ABC):
|
||||
entity_type=Container,
|
||||
service_name=self.context.get().objectstore_service,
|
||||
parent_container=parent_container,
|
||||
container_name=container_request.name.root,
|
||||
container_name=fqn.quote_name(container_request.name.root),
|
||||
)
|
||||
|
||||
self.container_source_state.add(container_fqn)
|
||||
|
||||
@ -5,6 +5,9 @@
|
||||
"structureFormat": "csv",
|
||||
"isPartitioned": false
|
||||
},
|
||||
{
|
||||
"dataPath": "solved.png"
|
||||
},
|
||||
{
|
||||
"dataPath": "transactions_separator",
|
||||
"structureFormat": "csv",
|
||||
|
||||
BIN
ingestion/tests/integration/s3/resources/solved.png
Normal file
BIN
ingestion/tests/integration/s3/resources/solved.png
Normal file
Binary file not shown.
|
After Width: | Height: | Size: 1.2 KiB |
@ -34,7 +34,7 @@ def test_s3_ingestion(metadata, ingest_s3_storage, service_name):
|
||||
entity=Container, fqn=f"{service_name}.test-bucket", fields=["*"]
|
||||
)
|
||||
# The bucket has children and no dataModel
|
||||
assert 5 == len(bucket.children.root)
|
||||
assert 6 == len(bucket.children.root)
|
||||
assert not bucket.dataModel
|
||||
|
||||
# We can validate the children
|
||||
@ -78,3 +78,11 @@ def test_s3_ingestion(metadata, ingest_s3_storage, service_name):
|
||||
assert not transactions_separator.dataModel.isPartitioned
|
||||
assert 2 == len(transactions_separator.dataModel.columns)
|
||||
assert FileFormat.csv in transactions_separator.fileFormats
|
||||
|
||||
png_file: Container = metadata.get_by_name(
|
||||
entity=Container,
|
||||
fqn=f'{service_name}.test-bucket."solved.png"',
|
||||
fields=["*"],
|
||||
)
|
||||
assert not png_file.dataModel
|
||||
assert png_file.size > 1000
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user