diff --git a/ingestion/src/metadata/ingestion/source/storage/s3/metadata.py b/ingestion/src/metadata/ingestion/source/storage/s3/metadata.py index d026b95e17c..cce0661a713 100644 --- a/ingestion/src/metadata/ingestion/source/storage/s3/metadata.py +++ b/ingestion/src/metadata/ingestion/source/storage/s3/metadata.py @@ -14,15 +14,17 @@ import secrets import traceback from datetime import datetime, timedelta from enum import Enum -from typing import Iterable, List, Optional +from typing import Dict, Iterable, List, Optional from pandas import DataFrame -from pydantic import Extra, Field, ValidationError -from pydantic.main import BaseModel +from pydantic import ValidationError from metadata.generated.schema.api.data.createContainer import CreateContainerRequest from metadata.generated.schema.entity.data import container -from metadata.generated.schema.entity.data.container import ContainerDataModel +from metadata.generated.schema.entity.data.container import ( + Container, + ContainerDataModel, +) from metadata.generated.schema.entity.data.table import Column from metadata.generated.schema.entity.services.connections.metadata.openMetadataConnection import ( OpenMetadataConnection, @@ -37,9 +39,14 @@ from metadata.generated.schema.metadataIngestion.storage.containerMetadataConfig from metadata.generated.schema.metadataIngestion.workflow import ( Source as WorkflowSource, ) +from metadata.generated.schema.type.entityReference import EntityReference from metadata.ingestion.api.source import InvalidSourceException from metadata.ingestion.source.database.datalake.metadata import DatalakeSource from metadata.ingestion.source.database.datalake.models import DatalakeColumnWrapper +from metadata.ingestion.source.storage.s3.models import ( + S3BucketResponse, + S3ContainerDetails, +) from metadata.ingestion.source.storage.storage_service import StorageServiceSource from metadata.utils.filters import filter_by_container from metadata.utils.logger import ingestion_logger @@ -56,52 +63,6 @@ class S3Metric(Enum): BUCKET_SIZE_BYTES = "BucketSizeBytes" -class S3BucketResponse(BaseModel): - """ - Class modelling a response received from s3_client.list_buckets operation - """ - - class Config: - extra = Extra.forbid - - name: str = Field(..., description="Bucket name", title="Bucket Name", alias="Name") - creation_date: datetime = Field( - ..., - description="Timestamp of Bucket creation in ISO format", - title="Creation Timestamp", - alias="CreationDate", - ) - - -class S3ContainerDetails(BaseModel): - """Class mapping container details used to create the container requests""" - - class Config: - extra = Extra.forbid - - name: str = Field(..., description="Bucket name", title="Bucket Name") - prefix: str = Field(..., description="Prefix for the container", title="Prefix") - number_of_objects: float = Field( - ..., description="Total nr. of objects", title="Nr. of objects" - ) - size: float = Field( - ..., - description="Total size in bytes of all objects", - title="Total size(bytes) of objects", - ) - file_formats: Optional[List[container.FileFormat]] = Field( - ..., description="File formats", title="File formats" - ) - data_model: Optional[ContainerDataModel] = Field( - ..., description="Data Model of the container", title="Data Model" - ) - creation_date: str = Field( - ..., - description="Timestamp of Bucket creation in ISO format", - title="Creation Timestamp", - ) - - class S3Source(StorageServiceSource): """ Source implementation to ingest S3 buckets data. @@ -112,6 +73,8 @@ class S3Source(StorageServiceSource): self.s3_client = self.connection.s3_client self.cloudwatch_client = self.connection.cloudwatch_client + self._bucket_cache: Dict[str, Container] = {} + @classmethod def create(cls, config_dict, metadata_config: OpenMetadataConnection): config: WorkflowSource = WorkflowSource.parse_obj(config_dict) @@ -127,6 +90,13 @@ class S3Source(StorageServiceSource): for bucket_response in bucket_results: try: + + # We always try to generate the parent container (the bucket) + yield self._generate_unstructured_container( + bucket_response=bucket_response + ) + self._bucket_cache[bucket_response.name] = self.context.container + metadata_config = self._load_metadata_file( bucket_name=bucket_response.name ) @@ -141,16 +111,14 @@ class S3Source(StorageServiceSource): ] = self._generate_container_details( bucket_response=bucket_response, metadata_entry=metadata_entry, + parent=EntityReference( + id=self._bucket_cache[bucket_response.name].id.__root__, + type="container", + ), ) if structured_container: yield structured_container - else: - logger.info( - f"No metadata found for bucket {bucket_response.name}, generating unstructured container.." - ) - yield self._generate_unstructured_container( - bucket_response=bucket_response - ) + except ValidationError as err: error = f"Validation error while creating Container from bucket details - {err}" logger.debug(traceback.format_exc()) @@ -174,10 +142,14 @@ class S3Source(StorageServiceSource): size=container_details.size, dataModel=container_details.data_model, service=self.context.objectstore_service.fullyQualifiedName, + parent=container_details.parent, ) def _generate_container_details( - self, bucket_response: S3BucketResponse, metadata_entry: MetadataEntry + self, + bucket_response: S3BucketResponse, + metadata_entry: MetadataEntry, + parent: Optional[EntityReference] = None, ) -> Optional[S3ContainerDetails]: bucket_name = bucket_response.name sample_key = self._get_sample_file_path( @@ -188,7 +160,7 @@ class S3Source(StorageServiceSource): columns = self.extract_column_definitions(bucket_name, sample_key) if columns: return S3ContainerDetails( - name=f"{bucket_name}.{metadata_entry.dataPath.strip(S3_KEY_SEPARATOR)}", + name=metadata_entry.dataPath.strip(S3_KEY_SEPARATOR), prefix=f"{S3_KEY_SEPARATOR}{metadata_entry.dataPath.strip(S3_KEY_SEPARATOR)}", creation_date=bucket_response.creation_date.isoformat(), number_of_objects=self._fetch_metric( @@ -201,6 +173,7 @@ class S3Source(StorageServiceSource): data_model=ContainerDataModel( isPartitioned=metadata_entry.isPartitioned, columns=columns ), + parent=parent, ) return None diff --git a/ingestion/src/metadata/ingestion/source/storage/s3/models.py b/ingestion/src/metadata/ingestion/source/storage/s3/models.py new file mode 100644 index 00000000000..76b041d8f7a --- /dev/null +++ b/ingestion/src/metadata/ingestion/source/storage/s3/models.py @@ -0,0 +1,76 @@ +# Copyright 2021 Collate +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# http://www.apache.org/licenses/LICENSE-2.0 +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +""" +S3 custom pydantic models +""" +from datetime import datetime +from typing import List, Optional + +from pydantic import BaseModel, Extra, Field + +from metadata.generated.schema.entity.data.container import ( + ContainerDataModel, + FileFormat, +) +from metadata.generated.schema.type.entityReference import EntityReference + + +class S3BucketResponse(BaseModel): + """ + Class modelling a response received from s3_client.list_buckets operation + """ + + class Config: + extra = Extra.forbid + + name: str = Field(..., description="Bucket name", alias="Name") + creation_date: datetime = Field( + ..., + description="Timestamp of Bucket creation in ISO format", + alias="CreationDate", + ) + + +class S3ContainerDetails(BaseModel): + """ + Class mapping container details used to create the container requests + """ + + class Config: + extra = Extra.forbid + + name: str = Field(..., description="Bucket name") + prefix: str = Field(..., description="Prefix for the container") + number_of_objects: float = Field( + ..., + description="Total nr. of objects", + ) + size: float = Field( + ..., + description="Total size in bytes of all objects", + title="Total size(bytes) of objects", + ) + file_formats: Optional[List[FileFormat]] = Field( + ..., + description="File formats", + ) + data_model: Optional[ContainerDataModel] = Field( + ..., + description="Data Model of the container", + ) + creation_date: str = Field( + ..., + description="Timestamp of Bucket creation in ISO format", + ) + parent: Optional[EntityReference] = Field( + None, + description="Reference to the parent container", + ) diff --git a/ingestion/src/metadata/ingestion/source/storage/storage_service.py b/ingestion/src/metadata/ingestion/source/storage/storage_service.py index 889c1ff3eb0..4e2415488fd 100644 --- a/ingestion/src/metadata/ingestion/source/storage/storage_service.py +++ b/ingestion/src/metadata/ingestion/source/storage/storage_service.py @@ -66,7 +66,7 @@ class StorageServiceTopology(ServiceTopology): stages=[ NodeStage( type_=Container, - context="containers", + context="container", processor="yield_create_container_requests", consumer=["objectstore_service"], nullable=True, diff --git a/ingestion/tests/unit/topology/storage/test_storage.py b/ingestion/tests/unit/topology/storage/test_storage.py index 3e56951fc4c..ab7ae93643e 100644 --- a/ingestion/tests/unit/topology/storage/test_storage.py +++ b/ingestion/tests/unit/topology/storage/test_storage.py @@ -14,6 +14,7 @@ Unit tests for Object store source import datetime import io import json +import uuid from typing import List from unittest import TestCase from unittest.mock import patch @@ -33,6 +34,7 @@ from metadata.generated.schema.metadataIngestion.storage.containerMetadataConfig from metadata.generated.schema.metadataIngestion.workflow import ( OpenMetadataWorkflowConfig, ) +from metadata.generated.schema.type.entityReference import EntityReference from metadata.ingestion.api.source import InvalidSourceException from metadata.ingestion.source.database.datalake.metadata import DatalakeSource from metadata.ingestion.source.storage.s3.metadata import ( @@ -218,15 +220,19 @@ class StorageUnitTest(TestCase): self.object_store_source.extract_column_definitions = ( lambda bucket_name, sample_key: columns ) + + entity_ref = EntityReference(id=uuid.uuid4(), type="container") + self.assertEquals( S3ContainerDetails( - name="test_bucket.transactions", + name="transactions", prefix="/transactions", number_of_objects=100, size=100, file_formats=[FileFormat.csv], data_model=ContainerDataModel(isPartitioned=False, columns=columns), creation_date=datetime.datetime(2000, 1, 1).isoformat(), + parent=entity_ref, ), self.object_store_source._generate_container_details( S3BucketResponse( @@ -237,6 +243,7 @@ class StorageUnitTest(TestCase): structureFormat="csv", isPartitioned=False, ), + parent=entity_ref, ), )