diff --git a/ingestion/src/metadata/ingestion/source/storage/s3/metadata.py b/ingestion/src/metadata/ingestion/source/storage/s3/metadata.py index cce0661a713..a3438e7287c 100644 --- a/ingestion/src/metadata/ingestion/source/storage/s3/metadata.py +++ b/ingestion/src/metadata/ingestion/source/storage/s3/metadata.py @@ -157,7 +157,12 @@ class S3Source(StorageServiceSource): ) # if we have a sample file to fetch a schema from if sample_key: - columns = self.extract_column_definitions(bucket_name, sample_key) + + columns = self._get_columns( + bucket_name=bucket_name, + sample_key=sample_key, + metadata_entry=metadata_entry, + ) if columns: return S3ContainerDetails( name=metadata_entry.dataPath.strip(S3_KEY_SEPARATOR), @@ -177,6 +182,15 @@ class S3Source(StorageServiceSource): ) return None + def _get_columns( + self, bucket_name: str, sample_key: str, metadata_entry: MetadataEntry + ) -> Optional[List[Column]]: + """ + Get the columns from the file and partition information + """ + extracted_cols = self.extract_column_definitions(bucket_name, sample_key) + return (metadata_entry.partitionColumns or []) + (extracted_cols or []) + def extract_column_definitions( self, bucket_name: str, sample_key: str ) -> List[Column]: @@ -305,14 +319,13 @@ class S3Source(StorageServiceSource): @staticmethod def _get_sample_file_prefix(metadata_entry: MetadataEntry) -> Optional[str]: + """ + Return a prefix if we have structure data to read + """ result = f"{metadata_entry.dataPath.strip(S3_KEY_SEPARATOR)}" if not metadata_entry.structureFormat: logger.warning(f"Ignoring un-structured metadata entry {result}") return None - if metadata_entry.isPartitioned and metadata_entry.partitionColumn: - result = ( - f"{result}/{metadata_entry.partitionColumn.strip(S3_KEY_SEPARATOR)}" - ) return result def _get_sample_file_path( diff --git a/ingestion/tests/integration/ometa/test_ometa_objectstore_api.py b/ingestion/tests/integration/ometa/test_ometa_storage_api.py similarity index 100% rename from ingestion/tests/integration/ometa/test_ometa_objectstore_api.py rename to ingestion/tests/integration/ometa/test_ometa_storage_api.py diff --git a/ingestion/tests/unit/topology/storage/test_storage.py b/ingestion/tests/unit/topology/storage/test_storage.py index ab7ae93643e..4767776f556 100644 --- a/ingestion/tests/unit/topology/storage/test_storage.py +++ b/ingestion/tests/unit/topology/storage/test_storage.py @@ -283,10 +283,10 @@ class StorageUnitTest(TestCase): dataPath="transactions", structureFormat="parquet", isPartitioned=True, - partitionColumn="date", + partitionColumns=[Column(name="date", dataType=DataType.DATE)], ) self.assertEquals( - "transactions/date", + "transactions", self.object_store_source._get_sample_file_prefix( metadata_entry=input_metadata ), diff --git a/openmetadata-spec/src/main/resources/json/schema/metadataIngestion/storage/containerMetadataConfig.json b/openmetadata-spec/src/main/resources/json/schema/metadataIngestion/storage/containerMetadataConfig.json index 0e57d787555..21c98b56906 100644 --- a/openmetadata-spec/src/main/resources/json/schema/metadataIngestion/storage/containerMetadataConfig.json +++ b/openmetadata-spec/src/main/resources/json/schema/metadataIngestion/storage/containerMetadataConfig.json @@ -27,10 +27,13 @@ "type": "boolean", "default": false }, - "partitionColumn": { - "title": "Partition Column", - "description": "What is the partition column in case the container's data is partitioned", - "type": "string", + "partitionColumns": { + "title": "Partition Columns", + "description": "What are the partition columns in case the container's data is partitioned", + "type": "array", + "items": { + "$ref": "../../entity/data/table.json#/definitions/column" + }, "default": null } },