diff --git a/ingestion/src/metadata/ingestion/source/storage/s3/metadata.py b/ingestion/src/metadata/ingestion/source/storage/s3/metadata.py index e82df1002ff..ac876f67057 100644 --- a/ingestion/src/metadata/ingestion/source/storage/s3/metadata.py +++ b/ingestion/src/metadata/ingestion/source/storage/s3/metadata.py @@ -176,6 +176,18 @@ class S3Source(StorageServiceSource): yield Either(right=container_request) self.register_record(container_request=container_request) + def get_size(self, bucket_name: str, file_path: str) -> Optional[float]: + """ + Method to get the size of the file + """ + try: + file_obj = self.s3_client.head_object(Bucket=bucket_name, Key=file_path) + return file_obj["ContentLength"] + except Exception as exc: + logger.debug(f"Failed to get size of file due to {exc}") + logger.debug(traceback.format_exc()) + return None + def _generate_container_details( self, bucket_response: S3BucketResponse, @@ -183,6 +195,31 @@ class S3Source(StorageServiceSource): parent: Optional[EntityReference] = None, ) -> Optional[S3ContainerDetails]: bucket_name = bucket_response.name + object_size = self.get_size( + bucket_name=bucket_name, + file_path=metadata_entry.dataPath.strip(KEY_SEPARATOR), + ) + if not metadata_entry.structureFormat and object_size: + prefix = f"{KEY_SEPARATOR}{metadata_entry.dataPath.strip(KEY_SEPARATOR)}" + return S3ContainerDetails( + name=metadata_entry.dataPath.strip(KEY_SEPARATOR), + prefix=prefix, + creation_date=bucket_response.creation_date.isoformat() + if bucket_response.creation_date + else None, + file_formats=[], + data_model=None, + parent=parent, + size=self.get_size( + bucket_name=bucket_name, + file_path=metadata_entry.dataPath.strip(KEY_SEPARATOR), + ), + fullPath=self._get_full_path(bucket_name, prefix), + sourceUrl=self._get_object_source_url( + bucket_name=bucket_name, + prefix=metadata_entry.dataPath.strip(KEY_SEPARATOR), + ), + ) sample_key = self._get_sample_file_path( bucket_name=bucket_name, metadata_entry=metadata_entry ) diff --git a/ingestion/src/metadata/ingestion/source/storage/s3/models.py b/ingestion/src/metadata/ingestion/source/storage/s3/models.py index 0969b54f071..faa5d25b1f8 100644 --- a/ingestion/src/metadata/ingestion/source/storage/s3/models.py +++ b/ingestion/src/metadata/ingestion/source/storage/s3/models.py @@ -52,21 +52,21 @@ class S3ContainerDetails(BaseModel): name: str = Field(..., description="Bucket name") prefix: str = Field(..., description="Prefix for the container") - number_of_objects: float = Field( - ..., + number_of_objects: Optional[float] = Field( + None, description="Total nr. of objects", ) - size: float = Field( - ..., + size: Optional[float] = Field( + None, description="Total size in bytes of all objects", title="Total size(bytes) of objects", ) file_formats: Optional[List[FileFormat]] = Field( - ..., + None, description="File formats", ) data_model: Optional[ContainerDataModel] = Field( - ..., + None, description="Data Model of the container", ) creation_date: Optional[str] = Field( diff --git a/ingestion/src/metadata/ingestion/source/storage/storage_service.py b/ingestion/src/metadata/ingestion/source/storage/storage_service.py index 838624dd9b8..0c11ae7d2a6 100644 --- a/ingestion/src/metadata/ingestion/source/storage/storage_service.py +++ b/ingestion/src/metadata/ingestion/source/storage/storage_service.py @@ -205,7 +205,7 @@ class StorageServiceSource(TopologyRunnerMixin, Source, ABC): entity_type=Container, service_name=self.context.get().objectstore_service, parent_container=parent_container, - container_name=container_request.name.root, + container_name=fqn.quote_name(container_request.name.root), ) self.container_source_state.add(container_fqn) diff --git a/ingestion/tests/integration/s3/resources/openmetadata.json b/ingestion/tests/integration/s3/resources/openmetadata.json index 2bd3d22cea7..1faa7f9bb95 100644 --- a/ingestion/tests/integration/s3/resources/openmetadata.json +++ b/ingestion/tests/integration/s3/resources/openmetadata.json @@ -5,6 +5,9 @@ "structureFormat": "csv", "isPartitioned": false }, + { + "dataPath": "solved.png" + }, { "dataPath": "transactions_separator", "structureFormat": "csv", diff --git a/ingestion/tests/integration/s3/resources/solved.png b/ingestion/tests/integration/s3/resources/solved.png new file mode 100644 index 00000000000..4ddf4fdfa3e Binary files /dev/null and b/ingestion/tests/integration/s3/resources/solved.png differ diff --git a/ingestion/tests/integration/s3/test_s3_storage.py b/ingestion/tests/integration/s3/test_s3_storage.py index 23759a7b2fe..b41047e508c 100644 --- a/ingestion/tests/integration/s3/test_s3_storage.py +++ b/ingestion/tests/integration/s3/test_s3_storage.py @@ -34,7 +34,7 @@ def test_s3_ingestion(metadata, ingest_s3_storage, service_name): entity=Container, fqn=f"{service_name}.test-bucket", fields=["*"] ) # The bucket has children and no dataModel - assert 5 == len(bucket.children.root) + assert 6 == len(bucket.children.root) assert not bucket.dataModel # We can validate the children @@ -78,3 +78,11 @@ def test_s3_ingestion(metadata, ingest_s3_storage, service_name): assert not transactions_separator.dataModel.isPartitioned assert 2 == len(transactions_separator.dataModel.columns) assert FileFormat.csv in transactions_separator.fileFormats + + png_file: Container = metadata.get_by_name( + entity=Container, + fqn=f'{service_name}.test-bucket."solved.png"', + fields=["*"], + ) + assert not png_file.dataModel + assert png_file.size > 1000