Refactor DBT for schema changes (#4497)

Refactor DBT for schema changes (#4497)
This commit is contained in:
Onkar Ravgan 2022-04-26 17:40:47 +05:30 committed by GitHub
parent 897e4ff765
commit 148d7f47ec
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 168 additions and 171 deletions

View File

@ -10,12 +10,13 @@
"enum": ["DatabaseMetadata"],
"default": "DatabaseMetadata"
},
"dbtConfig": {
"dbtLocalConfig": {
"title": "DBT Local Config Source",
"description": "DBT Catalog and Manifest file path config.",
"type": "object",
"properties": {
"dbtCatalogFilePath": {
"description": "DBT catalog file to extract dbt models with their column schemas.",
"description": "DBT catalog file path to extract dbt models with their column schemas.",
"type": "string"
},
"dbtManifestFilePath": {
@ -25,6 +26,43 @@
},
"additionalProperties": false,
"required": ["dbtCatalogFilePath", "dbtManifestFilePath"]
},
"dbtHttpConfig": {
"title": "DBT HTTP Config Source",
"description": "DBT Catalog and Manifest HTTP path configuration.",
"type": "object",
"properties": {
"dbtCatalogHttpPath": {
"description": "DBT catalog http file path to extract dbt models with their column schemas.",
"type": "string"
},
"dbtManifestHttpPath": {
"description": "DBT manifest http file path to extract dbt models and associate with tables.",
"type": "string"
}
},
"additionalProperties": false,
"required": ["dbtCatalogHttpPath", "dbtManifestHttpPath"]
},
"dbtS3Config": {
"title": "DBT S3 Config Source",
"description": "DBT Catalog and Manifest files in S3 bucket. We will search for catalog.json and manifest.json.",
"properties": {
"dbtSecurityConfig": {
"title": "DBT S3 Security Config",
"$ref": "../security/credentials/awsCredentials.json"
}
}
},
"dbtGCSConfig": {
"title": "DBT GCS Config Source",
"description": "DBT Catalog and Manifest files in GCS storage. We will search for catalog.json and manifest.json.",
"properties": {
"dbtSecurityConfig": {
"title": "DBT GCS Security Config",
"$ref": "../security/credentials/gcsCredentials.json"
}
}
}
},
"properties": {
@ -71,34 +109,23 @@
"description": "Regex exclude tables or databases that matches the pattern.",
"$ref": "../type/filterPattern.json#/definitions/filterPattern"
},
"dbtProvider": {
"description": "Method from which the DBT files will be fetched. Accepted values are: 's3'(Required aws s3 credentials to be provided), 'gcs'(Required gcs credentials to be provided), 'gcs-path'(path of the file containing gcs credentials), 'local'(path of dbt files on local system), 'http'(url path of dbt files).",
"type": "string",
"enum": ["local", "http", "gcs", "gcs-path", "s3"]
},
"dbtConfig": {
"$ref": "#/definitions/dbtConfig"
},
"dbtSecurityConfig": {
"description": "DBT configuration.",
"dbtConfigSource": {
"title": "DBT Configuration Source",
"description": "Available sources to fetch DBT catalog and manifest files.",
"oneOf": [
{
"$ref": "../security/credentials/gcsCredentials.json"
"$ref": "#/definitions/dbtLocalConfig"
},
{
"$ref": "../security/credentials/awsCredentials.json"
"$ref": "#/definitions/dbtHttpConfig"
},
{
"$ref": "#/definitions/dbtS3Config"
},
{
"$ref": "#/definitions/dbtGCSConfig"
}
]
},
"dbtCatalogFileName": {
"description": "DBT Catalog file name",
"type": "string",
"default": "catalog.json"
},
"dbtManifestFileName": {
"description": "DBT Manifest file name",
"type": "string",
"default": "manifest.json"
}
},
"additionalProperties": false

View File

@ -11,10 +11,9 @@
"""
Generic source to build SQL connectors.
"""
import json
import re
import traceback
import urllib.request
import uuid
from dataclasses import dataclass, field
from datetime import datetime
@ -69,9 +68,9 @@ from metadata.utils.connections import (
get_connection,
test_connection,
)
from metadata.utils.dbt_config import get_dbt_details
from metadata.utils.filters import filter_by_schema, filter_by_table
from metadata.utils.fqdn_generator import get_fqdn
from metadata.utils.helpers import store_gcs_credentials
logger = ometa_logger()
@ -108,114 +107,6 @@ def _get_table_description(schema: str, table: str, inspector: Inspector) -> str
return description
def get_dbt_local(config) -> Optional[Tuple[str, str]]:
try:
if config.dbtConfig.dbtCatalogFilePath is not None:
with open(
config.dbtConfig.dbtCatalogFilePath, "r", encoding="utf-8"
) as catalog:
dbt_catalog = catalog.read()
if config.dbtConfig.dbtManifestFilePath is not None:
with open(
config.dbtConfig.dbtManifestFilePath, "r", encoding="utf-8"
) as manifest:
dbt_manifest = manifest.read()
return json.loads(dbt_catalog), json.loads(dbt_manifest)
except Exception as exc:
logger.debug(traceback.format_exc())
logger.debug(f"Error fetching dbt files from local {repr(exc)}")
return None
def get_dbt_http(config) -> Optional[Tuple[str, str]]:
try:
catalog_file = urllib.request.urlopen(config.dbtConfig.dbtCatalogFilePath)
manifest_file = urllib.request.urlopen(config.dbtConfig.dbtManifestFilePath)
dbt_catalog = catalog_file.read().decode()
dbt_manifest = manifest_file.read().decode()
return json.loads(dbt_catalog), json.loads(dbt_manifest)
except Exception as exc:
logger.debug(traceback.format_exc())
logger.debug(f"Error fetching dbt files from http {repr(exc)}")
return None
def get_dbt_gcs(config) -> Optional[Tuple[str, str]]:
try:
dbt_options = config.dbtSecurityConfig.gcsConfig
if config.dbtProvider.value == "gcs":
options = {
"credentials": {
"type": dbt_options.type,
"project_id": dbt_options.projectId,
"private_key_id": dbt_options.privateKeyId,
"private_key": dbt_options.privateKey,
"client_email": dbt_options.clientEmail,
"client_id": dbt_options.clientId,
"auth_uri": dbt_options.authUri,
"token_uri": dbt_options.tokenUri,
"auth_provider_x509_cert_url": dbt_options.authProviderX509CertUrl,
"client_x509_cert_url": dbt_options.clientX509CertUrl,
}
}
else:
options = {"credentials_path": dbt_options.__root__}
if store_gcs_credentials(options):
from google.cloud import storage
client = storage.Client()
for bucket in client.list_buckets():
for blob in client.list_blobs(bucket.name):
if config.dbtManifestFileName in blob.name:
dbt_manifest = blob.download_as_string().decode()
if config.dbtCatalogFileName in blob.name:
dbt_catalog = blob.download_as_string().decode()
return json.loads(dbt_catalog), json.loads(dbt_manifest)
else:
return None
except Exception as exc:
logger.debug(traceback.format_exc())
logger.debug(f"Error fetching dbt files from gcs {repr(exc)}")
return None
def get_dbt_s3(config) -> Optional[Tuple[str, str]]:
try:
from metadata.utils.aws_client import AWSClient
aws_client = AWSClient(config.dbtSecurityConfig).get_resource("s3")
buckets = aws_client.buckets.all()
for bucket in buckets:
for bucket_object in bucket.objects.all():
if config.dbtManifestFileName in bucket_object.key:
dbt_manifest = bucket_object.get()["Body"].read().decode()
if config.dbtCatalogFileName in bucket_object.key:
dbt_catalog = bucket_object.get()["Body"].read().decode()
return json.loads(dbt_catalog), json.loads(dbt_manifest)
except Exception as exc:
logger.debug(traceback.format_exc())
logger.debug(f"Error fetching dbt files from s3 {repr(exc)}")
return None
def get_dbt_details(config) -> Optional[Tuple[str, str]]:
try:
if config.dbtProvider:
if config.dbtProvider.value == "s3":
return get_dbt_s3(config)
if "gcs" in config.dbtProvider.value:
return get_dbt_gcs(config)
if config.dbtProvider.value == "http":
return get_dbt_http(config)
if config.dbtProvider.value == "local":
return get_dbt_local(config)
raise Exception("Invalid DBT provider")
except Exception as exc:
logger.debug(traceback.format_exc())
logger.debug(f"Error fetching dbt files {repr(exc)}")
return None
class SQLSource(Source[OMetaDatabaseAndTable]):
"""
Source Connector implementation to extract
@ -256,7 +147,7 @@ class SQLSource(Source[OMetaDatabaseAndTable]):
self.data_models = {}
self.table_constraints = None
self.database_source_state = set()
dbt_details = get_dbt_details(self.config.sourceConfig.config)
dbt_details = get_dbt_details(self.config.sourceConfig.config.dbtConfigSource)
self.dbt_catalog = dbt_details[0] if dbt_details else None
self.dbt_manifest = dbt_details[1] if dbt_details else None
self.profile_date = datetime.now()
@ -565,7 +456,11 @@ class SQLSource(Source[OMetaDatabaseAndTable]):
"""
Get all the DBT information and feed it to the Table Entity
"""
if self.source_config.dbtProvider and self.dbt_catalog and self.dbt_manifest:
if (
self.source_config.dbtConfigSource
and self.dbt_manifest
and self.dbt_catalog
):
logger.info("Parsing Data Models")
manifest_entities = {
**self.dbt_manifest["nodes"],

View File

@ -0,0 +1,109 @@
# Copyright 2021 Collate
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
# http://www.apache.org/licenses/LICENSE-2.0
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""
Hosts the singledispatch to get DBT files
"""
import json
import traceback
import urllib.request
from functools import singledispatch
from metadata.generated.schema.metadataIngestion.databaseServiceMetadataPipeline import (
DbtGCSConfig,
DbtHttpConfig,
DbtLocalConfig,
DbtS3Config,
)
from metadata.ingestion.ometa.utils import ometa_logger
from metadata.utils.credentials import set_google_credentials
logger = ometa_logger()
DBT_CATALOG_FILE_NAME = "catalog.json"
DBT_MANIFEST_FILE_NAME = "manifest.json"
@singledispatch
def get_dbt_details(config):
if config:
raise NotImplementedError(
f"Config not implemented for type {type(config)}: {config}"
)
@get_dbt_details.register
def _(config: DbtLocalConfig):
try:
if config.dbtCatalogFilePath is not None:
with open(config.dbtCatalogFilePath, "r", encoding="utf-8") as catalog:
dbt_catalog = catalog.read()
if config.dbtManifestFilePath is not None:
with open(config.dbtManifestFilePath, "r", encoding="utf-8") as manifest:
dbt_manifest = manifest.read()
return json.loads(dbt_catalog), json.loads(dbt_manifest)
except Exception as exc:
logger.error(traceback.format_exc())
logger.error(f"Error fetching dbt files from local {repr(exc)}")
return None
@get_dbt_details.register
def _(config: DbtHttpConfig):
try:
catalog_file = urllib.request.urlopen(config.dbtCatalogHttpPath)
manifest_file = urllib.request.urlopen(config.dbtManifestHttpPath)
dbt_catalog = catalog_file.read().decode()
dbt_manifest = manifest_file.read().decode()
return json.loads(dbt_catalog), json.loads(dbt_manifest)
except Exception as exc:
logger.error(traceback.format_exc())
logger.error(f"Error fetching dbt files from file server {repr(exc)}")
return None
@get_dbt_details.register
def _(config: DbtS3Config):
try:
from metadata.utils.aws_client import AWSClient
aws_client = AWSClient(config.dbtSecurityConfig).get_resource("s3")
buckets = aws_client.buckets.all()
for bucket in buckets:
for bucket_object in bucket.objects.all():
if DBT_MANIFEST_FILE_NAME in bucket_object.key:
dbt_manifest = bucket_object.get()["Body"].read().decode()
if DBT_CATALOG_FILE_NAME in bucket_object.key:
dbt_catalog = bucket_object.get()["Body"].read().decode()
return json.loads(dbt_catalog), json.loads(dbt_manifest)
except Exception as exc:
logger.error(traceback.format_exc())
logger.error(f"Error fetching dbt files from s3 {repr(exc)}")
return None
@get_dbt_details.register
def _(config: DbtGCSConfig):
try:
from google.cloud import storage
set_google_credentials(gcs_credentials=config.dbtSecurityConfig)
client = storage.Client()
for bucket in client.list_buckets():
for blob in client.list_blobs(bucket.name):
if DBT_MANIFEST_FILE_NAME in blob.name:
dbt_manifest = blob.download_as_string().decode()
if DBT_CATALOG_FILE_NAME in blob.name:
dbt_catalog = blob.download_as_string().decode()
return json.loads(dbt_catalog), json.loads(dbt_manifest)
except Exception as exc:
logger.error(traceback.format_exc())
logger.error(f"Error fetching dbt files from gcs {repr(exc)}")
return None

View File

@ -9,10 +9,7 @@
# See the License for the specific language governing permissions and
# limitations under the License.
import json
import logging
import os
import tempfile
from datetime import datetime, timedelta
from typing import Any, Dict, Iterable
@ -214,34 +211,3 @@ def get_raw_extract_iter(alchemy_helper) -> Iterable[Dict[str, Any]]:
rows = alchemy_helper.execute_query()
for row in rows:
yield row
def create_credential_temp_file(credentials: dict) -> str:
with tempfile.NamedTemporaryFile(delete=False) as fp:
cred_json = json.dumps(credentials, indent=4, separators=(",", ": "))
fp.write(cred_json.encode())
return fp.name
def store_gcs_credentials(options) -> bool:
"""
Method to store GCS credentials from the config file or from a file whose path is provided
into the environment variable as required by GCS
"""
if not os.environ.get("GOOGLE_APPLICATION_CREDENTIALS"):
if options.get("credentials_path"):
os.environ["GOOGLE_APPLICATION_CREDENTIALS"] = options["credentials_path"]
del options["credentials_path"]
elif options.get("credentials"):
temp_credentials = create_credential_temp_file(
credentials=options.get("credentials")
)
os.environ["GOOGLE_APPLICATION_CREDENTIALS"] = temp_credentials
del options["credentials"]
else:
logger.warning(
"Please refer to the Google Cloud Storage documentation, especially the credentials part"
"https://cloud.google.com/storage/docs/authentication"
)
return False
return True