mirror of
https://github.com/open-metadata/OpenMetadata.git
synced 2026-01-04 11:27:10 +00:00
Add partition columns details (#11062)
This commit is contained in:
parent
8f3cf712ff
commit
5152db488d
@ -157,7 +157,12 @@ class S3Source(StorageServiceSource):
|
||||
)
|
||||
# if we have a sample file to fetch a schema from
|
||||
if sample_key:
|
||||
columns = self.extract_column_definitions(bucket_name, sample_key)
|
||||
|
||||
columns = self._get_columns(
|
||||
bucket_name=bucket_name,
|
||||
sample_key=sample_key,
|
||||
metadata_entry=metadata_entry,
|
||||
)
|
||||
if columns:
|
||||
return S3ContainerDetails(
|
||||
name=metadata_entry.dataPath.strip(S3_KEY_SEPARATOR),
|
||||
@ -177,6 +182,15 @@ class S3Source(StorageServiceSource):
|
||||
)
|
||||
return None
|
||||
|
||||
def _get_columns(
|
||||
self, bucket_name: str, sample_key: str, metadata_entry: MetadataEntry
|
||||
) -> Optional[List[Column]]:
|
||||
"""
|
||||
Get the columns from the file and partition information
|
||||
"""
|
||||
extracted_cols = self.extract_column_definitions(bucket_name, sample_key)
|
||||
return (metadata_entry.partitionColumns or []) + (extracted_cols or [])
|
||||
|
||||
def extract_column_definitions(
|
||||
self, bucket_name: str, sample_key: str
|
||||
) -> List[Column]:
|
||||
@ -305,14 +319,13 @@ class S3Source(StorageServiceSource):
|
||||
|
||||
@staticmethod
|
||||
def _get_sample_file_prefix(metadata_entry: MetadataEntry) -> Optional[str]:
|
||||
"""
|
||||
Return a prefix if we have structure data to read
|
||||
"""
|
||||
result = f"{metadata_entry.dataPath.strip(S3_KEY_SEPARATOR)}"
|
||||
if not metadata_entry.structureFormat:
|
||||
logger.warning(f"Ignoring un-structured metadata entry {result}")
|
||||
return None
|
||||
if metadata_entry.isPartitioned and metadata_entry.partitionColumn:
|
||||
result = (
|
||||
f"{result}/{metadata_entry.partitionColumn.strip(S3_KEY_SEPARATOR)}"
|
||||
)
|
||||
return result
|
||||
|
||||
def _get_sample_file_path(
|
||||
|
||||
@ -283,10 +283,10 @@ class StorageUnitTest(TestCase):
|
||||
dataPath="transactions",
|
||||
structureFormat="parquet",
|
||||
isPartitioned=True,
|
||||
partitionColumn="date",
|
||||
partitionColumns=[Column(name="date", dataType=DataType.DATE)],
|
||||
)
|
||||
self.assertEquals(
|
||||
"transactions/date",
|
||||
"transactions",
|
||||
self.object_store_source._get_sample_file_prefix(
|
||||
metadata_entry=input_metadata
|
||||
),
|
||||
|
||||
@ -27,10 +27,13 @@
|
||||
"type": "boolean",
|
||||
"default": false
|
||||
},
|
||||
"partitionColumn": {
|
||||
"title": "Partition Column",
|
||||
"description": "What is the partition column in case the container's data is partitioned",
|
||||
"type": "string",
|
||||
"partitionColumns": {
|
||||
"title": "Partition Columns",
|
||||
"description": "What are the partition columns in case the container's data is partitioned",
|
||||
"type": "array",
|
||||
"items": {
|
||||
"$ref": "../../entity/data/table.json#/definitions/column"
|
||||
},
|
||||
"default": null
|
||||
}
|
||||
},
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user