Fix #19160: Add Depth Support for Structured Containers (#19288)

This commit is contained in:
Mayur Singal 2025-01-09 18:14:52 +05:30 committed by GitHub
parent e32dbe00d4
commit be3fdedc76
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
3 changed files with 146 additions and 9 deletions

View File

@ -12,6 +12,7 @@
import json
import secrets
import traceback
from copy import deepcopy
from datetime import datetime, timedelta
from enum import Enum
from typing import Dict, Iterable, List, Optional, Tuple
@ -336,6 +337,45 @@ class S3Source(StorageServiceSource):
)
return None
def _generate_structured_containers_by_depth(
self,
bucket_response: S3BucketResponse,
metadata_entry: MetadataEntry,
parent: Optional[EntityReference] = None,
) -> Iterable[S3ContainerDetails]:
try:
prefix = self._get_sample_file_prefix(metadata_entry=metadata_entry)
if prefix:
response = self.s3_client.list_objects_v2(
Bucket=bucket_response.name, Prefix=prefix
)
# total depth is depth of prefix + depth of the metadata entry
total_depth = metadata_entry.depth + len(prefix[:-1].split("/"))
candidate_keys = {
"/".join(entry.get("Key").split("/")[:total_depth]) + "/"
for entry in response[S3_CLIENT_ROOT_RESPONSE]
if entry
and entry.get("Key")
and len(entry.get("Key").split("/")) > total_depth
}
for key in candidate_keys:
metadata_entry_copy = deepcopy(metadata_entry)
metadata_entry_copy.dataPath = key.strip(KEY_SEPARATOR)
structured_container: Optional[
S3ContainerDetails
] = self._generate_container_details(
bucket_response=bucket_response,
metadata_entry=metadata_entry_copy,
parent=parent,
)
if structured_container:
yield structured_container
except Exception as err:
logger.warning(
f"Error while generating structured containers by depth for {metadata_entry.dataPath} - {err}"
)
logger.debug(traceback.format_exc())
def _generate_structured_containers(
self,
bucket_response: S3BucketResponse,
@ -347,15 +387,22 @@ class S3Source(StorageServiceSource):
f"Extracting metadata from path {metadata_entry.dataPath.strip(KEY_SEPARATOR)} "
f"and generating structured container"
)
structured_container: Optional[
S3ContainerDetails
] = self._generate_container_details(
bucket_response=bucket_response,
metadata_entry=metadata_entry,
parent=parent,
)
if structured_container:
yield structured_container
if metadata_entry.depth == 0:
structured_container: Optional[
S3ContainerDetails
] = self._generate_container_details(
bucket_response=bucket_response,
metadata_entry=metadata_entry,
parent=parent,
)
if structured_container:
yield structured_container
else:
yield from self._generate_structured_containers_by_depth(
bucket_response=bucket_response,
metadata_entry=metadata_entry,
parent=parent,
)
def is_valid_unstructured_file(self, accepted_extensions: List, key: str) -> bool:
# Split the string into a list of values

View File

@ -57,6 +57,43 @@ Again, this information will be added on top of the inferred schema from the dat
{% /codeInfo %}
{% codeInfo srNumber=7 %}
**Automated Container Ingestion**: Registering all the data paths one by one can be a time consuming job,
to make the automated structure container ingestion you can provide the depth at which all the data is available.
Let us understand this with the example, suppose following is the file hierarchy within my bucket.
```
# prefix/depth1/depth2/depth3
athena_service/my_database_a/my_schema_a/table_a/date=01-01-2025/data.parquet
athena_service/my_database_a/my_schema_a/table_a/date=02-01-2025/data.parquet
athena_service/my_database_a/my_schema_a/table_b/date=01-01-2025/data.parquet
athena_service/my_database_a/my_schema_a/table_b/date=02-01-2025/data.parquet
athena_service/my_database_b/my_schema_a/table_a/date=01-01-2025/data.parquet
athena_service/my_database_b/my_schema_a/table_a/date=02-01-2025/data.parquet
athena_service/my_database_b/my_schema_a/table_b/date=01-01-2025/data.parquet
athena_service/my_database_b/my_schema_a/table_b/date=02-01-2025/data.parquet
```
all my tables folders which contains the actual data are available at depth 3, hence when you specify the `depth: 3` in
manifest entry all following path will get registered as container in OpenMetadata with this single entry
```
athena_service/my_database_a/my_schema_a/table_a
athena_service/my_database_a/my_schema_a/table_b
athena_service/my_database_b/my_schema_a/table_a
athena_service/my_database_b/my_schema_a/table_b
```
saving efforts to add 4 individual entries compared to 1
{% /codeInfo %}
{% codeInfo srNumber=6 %}
**Unstructured Container**: OpenMetadata supports ingesting unstructured files like images, pdf's etc. We support fetching the file names, size and tags associates to such files.
@ -124,6 +161,14 @@ In case you want to ingest all unstructured files with irrespective of their fil
]
}
```
```json {% srNumber=7 %}
{
"dataPath": "athena_service",
"structureFormat": "parquet",
"isPartitioned": true,
"depth": 2
}
```
```json {% srNumber=6 %}
{
"dataPath": "path/to/solution.pdf",

View File

@ -57,6 +57,43 @@ Again, this information will be added on top of the inferred schema from the dat
{% /codeInfo %}
{% codeInfo srNumber=7 %}
**Automated Container Ingestion**: Registering all the data paths one by one can be a time consuming job,
to make the automated structure container ingestion you can provide the depth at which all the data is available.
Let us understand this with the example, suppose following is the file hierarchy within my bucket.
```
# prefix/depth1/depth2/depth3
athena_service/my_database_a/my_schema_a/table_a/date=01-01-2025/data.parquet
athena_service/my_database_a/my_schema_a/table_a/date=02-01-2025/data.parquet
athena_service/my_database_a/my_schema_a/table_b/date=01-01-2025/data.parquet
athena_service/my_database_a/my_schema_a/table_b/date=02-01-2025/data.parquet
athena_service/my_database_b/my_schema_a/table_a/date=01-01-2025/data.parquet
athena_service/my_database_b/my_schema_a/table_a/date=02-01-2025/data.parquet
athena_service/my_database_b/my_schema_a/table_b/date=01-01-2025/data.parquet
athena_service/my_database_b/my_schema_a/table_b/date=02-01-2025/data.parquet
```
all my tables folders which contains the actual data are available at depth 3, hence when you specify the `depth: 3` in
manifest entry all following path will get registered as container in OpenMetadata with this single entry
```
athena_service/my_database_a/my_schema_a/table_a
athena_service/my_database_a/my_schema_a/table_b
athena_service/my_database_b/my_schema_a/table_a
athena_service/my_database_b/my_schema_a/table_b
```
saving efforts to add 4 individual entries compared to 1
{% /codeInfo %}
{% codeInfo srNumber=6 %}
**Unstructured Container**: OpenMetadata supports ingesting unstructured files like images, pdf's etc. We support fetching the file names, size and tags associates to such files.
@ -124,6 +161,14 @@ In case you want to ingest all unstructured files with irrespective of their fil
]
}
```
```json {% srNumber=7 %}
{
"dataPath": "athena_service",
"structureFormat": "parquet",
"isPartitioned": true,
"depth": 2
}
```
```json {% srNumber=6 %}
{
"dataPath": "path/to/solution.pdf",