Added prefix while parsing bucket objects (#4665)

* Added prefix to s3 bucket objects

* Added support for prefixes in gcs and s3 for DBT

* implented review comments

Co-authored-by: Onkar Ravgan <onkarravgan@Onkars-MacBook-Pro.local>
This commit is contained in:
Onkar Ravgan 2022-05-05 10:49:10 +05:30 committed by GitHub
parent 4aa5fbce04
commit 6f97b25175
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 61 additions and 4 deletions

View File

@ -10,6 +10,24 @@
"enum": ["DatabaseMetadata"],
"default": "DatabaseMetadata"
},
"dbtBucketDetails": {
"title": "DBT Bucket Details",
"description": "Details of the bucket where the dbt files are stored",
"type": "object",
"properties": {
"dbtBucketName": {
"title": "DBT Bucket Name",
"description": "Name of the bucket where the dbt files are stored",
"type": "string"
},
"dbtObjectPrefix": {
"title": "DBT Object Prefix",
"description": "Path of the folder where the dbt files are stored",
"type": "string"
}
},
"additionalProperties": false
},
"dbtLocalConfig": {
"title": "DBT Local Config Source",
"description": "DBT Catalog and Manifest file path config.",
@ -51,6 +69,10 @@
"dbtSecurityConfig": {
"title": "DBT S3 Security Config",
"$ref": "../security/credentials/awsCredentials.json"
},
"dbtPrefixConfig": {
"title": "DBT Prefix Config",
"$ref": "#/definitions/dbtBucketDetails"
}
}
},
@ -61,6 +83,10 @@
"dbtSecurityConfig": {
"title": "DBT GCS Security Config",
"$ref": "../security/credentials/gcsCredentials.json"
},
"dbtPrefixConfig": {
"title": "DBT Prefix Config",
"$ref": "#/definitions/dbtBucketDetails"
}
}
}

View File

@ -15,6 +15,7 @@ import json
import traceback
import urllib.request
from functools import singledispatch
from typing import Optional, Tuple
from metadata.generated.schema.metadataIngestion.databaseServiceMetadataPipeline import (
DbtGCSConfig,
@ -72,12 +73,21 @@ def _(config: DbtHttpConfig):
@get_dbt_details.register
def _(config: DbtS3Config):
try:
bucket_name, prefix = get_dbt_prefix_config(config)
from metadata.utils.aws_client import AWSClient
aws_client = AWSClient(config.dbtSecurityConfig).get_resource("s3")
buckets = aws_client.buckets.all()
if not bucket_name:
buckets = aws_client.buckets.all()
else:
buckets = [aws_client.Bucket(bucket_name)]
for bucket in buckets:
for bucket_object in bucket.objects.all():
if prefix:
obj_list = bucket.objects.filter(Prefix=prefix)
else:
obj_list = bucket.objects.all()
for bucket_object in obj_list:
if DBT_MANIFEST_FILE_NAME in bucket_object.key:
dbt_manifest = bucket_object.get()["Body"].read().decode()
if DBT_CATALOG_FILE_NAME in bucket_object.key:
@ -92,12 +102,21 @@ def _(config: DbtS3Config):
@get_dbt_details.register
def _(config: DbtGCSConfig):
try:
bucket_name, prefix = get_dbt_prefix_config(config)
from google.cloud import storage
set_google_credentials(gcs_credentials=config.dbtSecurityConfig)
client = storage.Client()
for bucket in client.list_buckets():
for blob in client.list_blobs(bucket.name):
if not bucket_name:
buckets = client.list_buckets()
else:
buckets = [client.get_bucket(bucket_name)]
for bucket in buckets:
if prefix:
obj_list = client.list_blobs(bucket.name, prefix=prefix)
else:
obj_list = client.list_blobs(bucket.name)
for blob in obj_list:
if DBT_MANIFEST_FILE_NAME in blob.name:
dbt_manifest = blob.download_as_string().decode()
if DBT_CATALOG_FILE_NAME in blob.name:
@ -107,3 +126,15 @@ def _(config: DbtGCSConfig):
logger.error(traceback.format_exc())
logger.error(f"Error fetching dbt files from gcs {repr(exc)}")
return None
def get_dbt_prefix_config(config) -> Tuple[Optional[str], Optional[str]]:
"""
Return (bucket, prefix) tuple
"""
if config.dbtPrefixConfig:
return (
config.dbtPrefixConfig.dbtBucketName,
config.dbtPrefixConfig.dbtObjectPrefix,
)
return None, None