mirror of
				https://github.com/open-metadata/OpenMetadata.git
				synced 2025-10-31 10:39:30 +00:00 
			
		
		
		
	issue-20074: s3 objects get paginated response (#21208)
This commit is contained in:
		
							parent
							
								
									35c1f5aead
								
							
						
					
					
						commit
						9c9e885d77
					
				| @ -64,6 +64,7 @@ from metadata.readers.file.config_source_factory import get_reader | |||||||
| from metadata.utils import fqn | from metadata.utils import fqn | ||||||
| from metadata.utils.filters import filter_by_container | from metadata.utils.filters import filter_by_container | ||||||
| from metadata.utils.logger import ingestion_logger | from metadata.utils.logger import ingestion_logger | ||||||
|  | from metadata.utils.s3_utils import list_s3_objects | ||||||
| from metadata.utils.tag_utils import get_ometa_tag_and_classification, get_tag_label | from metadata.utils.tag_utils import get_ometa_tag_and_classification, get_tag_label | ||||||
| 
 | 
 | ||||||
| logger = ingestion_logger() | logger = ingestion_logger() | ||||||
| @ -345,14 +346,13 @@ class S3Source(StorageServiceSource): | |||||||
|         try: |         try: | ||||||
|             prefix = self._get_sample_file_prefix(metadata_entry=metadata_entry) |             prefix = self._get_sample_file_prefix(metadata_entry=metadata_entry) | ||||||
|             if prefix: |             if prefix: | ||||||
|                 response = self.s3_client.list_objects_v2( |                 kwargs = {"Bucket": bucket_response.name, "Prefix": prefix} | ||||||
|                     Bucket=bucket_response.name, Prefix=prefix |                 response = list_s3_objects(self.s3_client, **kwargs) | ||||||
|                 ) |  | ||||||
|                 # total depth is depth of prefix + depth of the metadata entry |                 # total depth is depth of prefix + depth of the metadata entry | ||||||
|                 total_depth = metadata_entry.depth + len(prefix[:-1].split("/")) |                 total_depth = metadata_entry.depth + len(prefix[:-1].split("/")) | ||||||
|                 candidate_keys = { |                 candidate_keys = { | ||||||
|                     "/".join(entry.get("Key").split("/")[:total_depth]) + "/" |                     "/".join(entry.get("Key").split("/")[:total_depth]) + "/" | ||||||
|                     for entry in response[S3_CLIENT_ROOT_RESPONSE] |                     for entry in response | ||||||
|                     if entry |                     if entry | ||||||
|                     and entry.get("Key") |                     and entry.get("Key") | ||||||
|                     and len(entry.get("Key").split("/")) > total_depth |                     and len(entry.get("Key").split("/")) > total_depth | ||||||
| @ -464,12 +464,11 @@ class S3Source(StorageServiceSource): | |||||||
|         parent: Optional[EntityReference] = None, |         parent: Optional[EntityReference] = None, | ||||||
|     ): |     ): | ||||||
|         bucket_name = bucket_response.name |         bucket_name = bucket_response.name | ||||||
|         response = self.s3_client.list_objects_v2( |         kwargs = {"Bucket": bucket_name, "Prefix": metadata_entry.dataPath} | ||||||
|             Bucket=bucket_name, Prefix=metadata_entry.dataPath |         response = list_s3_objects(self.s3_client, **kwargs) | ||||||
|         ) |  | ||||||
|         candidate_keys = [ |         candidate_keys = [ | ||||||
|             entry["Key"] |             entry["Key"] | ||||||
|             for entry in response[S3_CLIENT_ROOT_RESPONSE] |             for entry in response | ||||||
|             if entry and entry.get("Key") and not entry.get("Key").endswith("/") |             if entry and entry.get("Key") and not entry.get("Key").endswith("/") | ||||||
|         ] |         ] | ||||||
|         for key in candidate_keys: |         for key in candidate_keys: | ||||||
|  | |||||||
		Loading…
	
	
			
			x
			
			
		
	
		Reference in New Issue
	
	Block a user
	 harshsoni2024
						harshsoni2024