mirror of
https://github.com/datahub-project/datahub.git
synced 2025-07-06 16:49:03 +00:00

Co-authored-by: ksrinath <ksrinath@users.noreply.github.com> Co-authored-by: Chakravarthy Racharla <chakru.racharla@acryl.io>
219 lines
7.0 KiB
Python
219 lines
7.0 KiB
Python
"""
|
|
This script is designed to manage and clean up contents in an S3 bucket, specifically targeting orphaned files and folders.
|
|
It provides functionality to list, delete, or simulate deletion of all objects under a specified S3 prefix using AWS assumed role credentials.
|
|
|
|
The script supports the following operations:
|
|
- Listing all files and folders under a specified S3 path.
|
|
- Deleting all contents under a specified S3 path.
|
|
- Performing a dry run to show what would be deleted without actually deleting the objects.
|
|
|
|
Environment variables required:
|
|
- DH_ICEBERG_AWS_ROLE: The ARN of the AWS role to assume.
|
|
- DH_ICEBERG_CLIENT_ID: The AWS client ID.
|
|
- DH_ICEBERG_CLIENT_SECRET: The AWS client secret.
|
|
|
|
Usage:
|
|
python folder_operations.py s3://bucket/prefix --list
|
|
python folder_operations.py s3://bucket/prefix --nuke
|
|
python folder_operations.py s3://bucket/prefix --dry-run
|
|
|
|
Arguments:
|
|
- s3_path: The S3 path to operate on (e.g., s3://bucket/prefix).
|
|
- --list: List all folders and files.
|
|
- --nuke: Delete all contents.
|
|
- --dry-run: Show what would be deleted without actually deleting.
|
|
- --region: AWS region (default: us-east-1).
|
|
|
|
Note: Only one action (--list, --nuke, or --dry-run) can be specified at a time.
|
|
|
|
"""
|
|
|
|
import argparse
|
|
import os
|
|
from datetime import datetime
|
|
from typing import Optional, Tuple
|
|
|
|
import boto3
|
|
from mypy_boto3_s3 import S3Client
|
|
|
|
|
|
def get_s3_client_with_role(
|
|
client_id: str,
|
|
client_secret: str,
|
|
role_arn: str,
|
|
region: str = "us-east-1",
|
|
session_name: str = "IcebergSession",
|
|
) -> Tuple[S3Client, datetime]: # type: ignore
|
|
"""
|
|
Create an S3 client with assumed role credentials.
|
|
"""
|
|
session = boto3.Session(
|
|
aws_access_key_id=client_id,
|
|
aws_secret_access_key=client_secret,
|
|
region_name=region,
|
|
)
|
|
|
|
sts_client = session.client("sts")
|
|
|
|
assumed_role_object = sts_client.assume_role(
|
|
RoleArn=role_arn, RoleSessionName=session_name
|
|
)
|
|
|
|
credentials = assumed_role_object["Credentials"]
|
|
|
|
s3_client: S3Client = boto3.client(
|
|
"s3",
|
|
region_name=region,
|
|
aws_access_key_id=credentials["AccessKeyId"],
|
|
aws_secret_access_key=credentials["SecretAccessKey"],
|
|
aws_session_token=credentials["SessionToken"],
|
|
)
|
|
|
|
return s3_client, credentials["Expiration"]
|
|
|
|
|
|
def delete_s3_objects(
|
|
s3_client: S3Client, bucket_name: str, prefix: str, dry_run: bool = False
|
|
) -> None:
|
|
"""
|
|
Delete all objects under the specified prefix.
|
|
"""
|
|
paginator = s3_client.get_paginator("list_objects_v2")
|
|
|
|
for page in paginator.paginate(Bucket=bucket_name, Prefix=prefix):
|
|
objects_to_delete = []
|
|
for obj in page.get("Contents", []):
|
|
objects_to_delete.append({"Key": obj["Key"]})
|
|
if dry_run:
|
|
print(f"Would delete: {obj['Key']}")
|
|
print(f" Size: {obj['Size'] / (1024 * 1024):.2f} MB")
|
|
print(f" Last Modified: {obj['LastModified']}")
|
|
|
|
if objects_to_delete and not dry_run:
|
|
s3_client.delete_objects(
|
|
Bucket=bucket_name,
|
|
Delete={"Objects": objects_to_delete}, # type: ignore
|
|
)
|
|
print(f"Deleted {len(objects_to_delete)} objects")
|
|
|
|
|
|
def list_s3_contents(
|
|
s3_path: str,
|
|
client_id: str,
|
|
client_secret: str,
|
|
role_arn: str,
|
|
region: str = "us-east-1",
|
|
delimiter: Optional[str] = None,
|
|
nuke: bool = False,
|
|
dry_run: bool = False,
|
|
) -> None:
|
|
"""
|
|
List or delete contents of an S3 path using assumed role credentials.
|
|
"""
|
|
if not s3_path.startswith("s3://"):
|
|
raise ValueError("S3 path must start with 's3://'")
|
|
|
|
bucket_name = s3_path.split("/")[2]
|
|
prefix = "/".join(s3_path.split("/")[3:])
|
|
if prefix and not prefix.endswith("/"):
|
|
prefix += "/"
|
|
|
|
s3_client, expiration = get_s3_client_with_role(
|
|
client_id=client_id,
|
|
client_secret=client_secret,
|
|
role_arn=role_arn,
|
|
region=region,
|
|
)
|
|
|
|
operation = "Deleting" if nuke else "Would delete" if dry_run else "Listing"
|
|
print(f"\n{operation} contents of {s3_path}")
|
|
print(f"Using role: {role_arn}")
|
|
print(f"Credentials expire at: {expiration}")
|
|
print("-" * 60)
|
|
|
|
if nuke or dry_run:
|
|
delete_s3_objects(s3_client, bucket_name, prefix, dry_run)
|
|
return
|
|
|
|
paginator = s3_client.get_paginator("list_objects_v2")
|
|
|
|
list_params = {"Bucket": bucket_name, "Prefix": prefix}
|
|
if delimiter:
|
|
list_params["Delimiter"] = delimiter
|
|
|
|
try:
|
|
pages = paginator.paginate(**list_params) # type: ignore
|
|
found_contents = False
|
|
|
|
for page in pages:
|
|
if delimiter and "CommonPrefixes" in page:
|
|
for common_prefix in page.get("CommonPrefixes", []):
|
|
found_contents = True
|
|
folder_name = common_prefix["Prefix"][len(prefix) :].rstrip("/")
|
|
print(f"📁 {folder_name}/")
|
|
|
|
for obj in page.get("Contents", []):
|
|
found_contents = True
|
|
file_path = obj["Key"][len(prefix) :]
|
|
if file_path:
|
|
size_mb = obj["Size"] / (1024 * 1024)
|
|
print(f"📄 {file_path}")
|
|
print(f" Size: {size_mb:.2f} MB")
|
|
print(f" Last Modified: {obj['LastModified']}")
|
|
|
|
if not found_contents:
|
|
print("No contents found in the specified path.")
|
|
|
|
except Exception as e:
|
|
print(f"Error accessing contents: {str(e)}")
|
|
|
|
|
|
def main():
|
|
parser = argparse.ArgumentParser(description="S3 Content Manager")
|
|
parser.add_argument(
|
|
"s3_path", help="S3 path to operate on (e.g., s3://bucket/prefix)"
|
|
)
|
|
parser.add_argument(
|
|
"--list", action="store_true", help="List all folders and files"
|
|
)
|
|
parser.add_argument("--nuke", action="store_true", help="Delete all contents")
|
|
parser.add_argument(
|
|
"--dry-run",
|
|
action="store_true",
|
|
help="Show what would be deleted without actually deleting",
|
|
)
|
|
parser.add_argument(
|
|
"--region", default="us-east-1", help="AWS region (default: us-east-1)"
|
|
)
|
|
|
|
args = parser.parse_args()
|
|
|
|
# Get environment variables
|
|
role_arn = os.environ.get("DH_ICEBERG_AWS_ROLE")
|
|
client_id = os.environ.get("DH_ICEBERG_CLIENT_ID")
|
|
client_secret = os.environ.get("DH_ICEBERG_CLIENT_SECRET")
|
|
|
|
if not all([role_arn, client_id, client_secret]):
|
|
raise ValueError(
|
|
"Missing required environment variables. Please set DH_ICEBERG_AWS_ROLE, DH_ICEBERG_CLIENT_ID, and DH_ICEBERG_CLIENT_SECRET"
|
|
)
|
|
|
|
# Validate arguments
|
|
if sum([args.list, args.nuke, args.dry_run]) != 1:
|
|
parser.error("Please specify exactly one action: --list, --nuke, or --dry-run")
|
|
|
|
list_s3_contents(
|
|
args.s3_path,
|
|
client_id=client_id, # type: ignore
|
|
client_secret=client_secret, # type: ignore
|
|
role_arn=role_arn, # type: ignore
|
|
region=args.region,
|
|
# delimiter='/' if args.list else None,
|
|
nuke=args.nuke,
|
|
dry_run=args.dry_run,
|
|
)
|
|
|
|
|
|
if __name__ == "__main__":
|
|
main()
|