From 148d7f47ecea98d2a8e7779a45379faf14049752 Mon Sep 17 00:00:00 2001 From: Onkar Ravgan Date: Tue, 26 Apr 2022 17:40:47 +0530 Subject: [PATCH] Refactor DBT for schema changes (#4497) Refactor DBT for schema changes (#4497) --- .../databaseServiceMetadataPipeline.json | 75 +++++++---- .../metadata/ingestion/source/sql_source.py | 121 ++---------------- ingestion/src/metadata/utils/dbt_config.py | 109 ++++++++++++++++ ingestion/src/metadata/utils/helpers.py | 34 ----- 4 files changed, 168 insertions(+), 171 deletions(-) create mode 100644 ingestion/src/metadata/utils/dbt_config.py diff --git a/catalog-rest-service/src/main/resources/json/schema/metadataIngestion/databaseServiceMetadataPipeline.json b/catalog-rest-service/src/main/resources/json/schema/metadataIngestion/databaseServiceMetadataPipeline.json index 41865f41a3e..0dff1b44db1 100644 --- a/catalog-rest-service/src/main/resources/json/schema/metadataIngestion/databaseServiceMetadataPipeline.json +++ b/catalog-rest-service/src/main/resources/json/schema/metadataIngestion/databaseServiceMetadataPipeline.json @@ -10,12 +10,13 @@ "enum": ["DatabaseMetadata"], "default": "DatabaseMetadata" }, - "dbtConfig": { + "dbtLocalConfig": { + "title": "DBT Local Config Source", "description": "DBT Catalog and Manifest file path config.", "type": "object", "properties": { "dbtCatalogFilePath": { - "description": "DBT catalog file to extract dbt models with their column schemas.", + "description": "DBT catalog file path to extract dbt models with their column schemas.", "type": "string" }, "dbtManifestFilePath": { @@ -25,6 +26,43 @@ }, "additionalProperties": false, "required": ["dbtCatalogFilePath", "dbtManifestFilePath"] + }, + "dbtHttpConfig": { + "title": "DBT HTTP Config Source", + "description": "DBT Catalog and Manifest HTTP path configuration.", + "type": "object", + "properties": { + "dbtCatalogHttpPath": { + "description": "DBT catalog http file path to extract dbt models with their column schemas.", + "type": "string" + }, + "dbtManifestHttpPath": { + "description": "DBT manifest http file path to extract dbt models and associate with tables.", + "type": "string" + } + }, + "additionalProperties": false, + "required": ["dbtCatalogHttpPath", "dbtManifestHttpPath"] + }, + "dbtS3Config": { + "title": "DBT S3 Config Source", + "description": "DBT Catalog and Manifest files in S3 bucket. We will search for catalog.json and manifest.json.", + "properties": { + "dbtSecurityConfig": { + "title": "DBT S3 Security Config", + "$ref": "../security/credentials/awsCredentials.json" + } + } + }, + "dbtGCSConfig": { + "title": "DBT GCS Config Source", + "description": "DBT Catalog and Manifest files in GCS storage. We will search for catalog.json and manifest.json.", + "properties": { + "dbtSecurityConfig": { + "title": "DBT GCS Security Config", + "$ref": "../security/credentials/gcsCredentials.json" + } + } } }, "properties": { @@ -71,34 +109,23 @@ "description": "Regex exclude tables or databases that matches the pattern.", "$ref": "../type/filterPattern.json#/definitions/filterPattern" }, - "dbtProvider": { - "description": "Method from which the DBT files will be fetched. Accepted values are: 's3'(Required aws s3 credentials to be provided), 'gcs'(Required gcs credentials to be provided), 'gcs-path'(path of the file containing gcs credentials), 'local'(path of dbt files on local system), 'http'(url path of dbt files).", - "type": "string", - "enum": ["local", "http", "gcs", "gcs-path", "s3"] - }, - "dbtConfig": { - "$ref": "#/definitions/dbtConfig" - }, - "dbtSecurityConfig": { - "description": "DBT configuration.", + "dbtConfigSource": { + "title": "DBT Configuration Source", + "description": "Available sources to fetch DBT catalog and manifest files.", "oneOf": [ { - "$ref": "../security/credentials/gcsCredentials.json" + "$ref": "#/definitions/dbtLocalConfig" }, { - "$ref": "../security/credentials/awsCredentials.json" + "$ref": "#/definitions/dbtHttpConfig" + }, + { + "$ref": "#/definitions/dbtS3Config" + }, + { + "$ref": "#/definitions/dbtGCSConfig" } ] - }, - "dbtCatalogFileName": { - "description": "DBT Catalog file name", - "type": "string", - "default": "catalog.json" - }, - "dbtManifestFileName": { - "description": "DBT Manifest file name", - "type": "string", - "default": "manifest.json" } }, "additionalProperties": false diff --git a/ingestion/src/metadata/ingestion/source/sql_source.py b/ingestion/src/metadata/ingestion/source/sql_source.py index 7005f3cbb9f..68edf853f45 100644 --- a/ingestion/src/metadata/ingestion/source/sql_source.py +++ b/ingestion/src/metadata/ingestion/source/sql_source.py @@ -11,10 +11,9 @@ """ Generic source to build SQL connectors. """ -import json + import re import traceback -import urllib.request import uuid from dataclasses import dataclass, field from datetime import datetime @@ -69,9 +68,9 @@ from metadata.utils.connections import ( get_connection, test_connection, ) +from metadata.utils.dbt_config import get_dbt_details from metadata.utils.filters import filter_by_schema, filter_by_table from metadata.utils.fqdn_generator import get_fqdn -from metadata.utils.helpers import store_gcs_credentials logger = ometa_logger() @@ -108,114 +107,6 @@ def _get_table_description(schema: str, table: str, inspector: Inspector) -> str return description -def get_dbt_local(config) -> Optional[Tuple[str, str]]: - try: - if config.dbtConfig.dbtCatalogFilePath is not None: - with open( - config.dbtConfig.dbtCatalogFilePath, "r", encoding="utf-8" - ) as catalog: - dbt_catalog = catalog.read() - if config.dbtConfig.dbtManifestFilePath is not None: - with open( - config.dbtConfig.dbtManifestFilePath, "r", encoding="utf-8" - ) as manifest: - dbt_manifest = manifest.read() - return json.loads(dbt_catalog), json.loads(dbt_manifest) - except Exception as exc: - logger.debug(traceback.format_exc()) - logger.debug(f"Error fetching dbt files from local {repr(exc)}") - return None - - -def get_dbt_http(config) -> Optional[Tuple[str, str]]: - try: - catalog_file = urllib.request.urlopen(config.dbtConfig.dbtCatalogFilePath) - manifest_file = urllib.request.urlopen(config.dbtConfig.dbtManifestFilePath) - dbt_catalog = catalog_file.read().decode() - dbt_manifest = manifest_file.read().decode() - return json.loads(dbt_catalog), json.loads(dbt_manifest) - except Exception as exc: - logger.debug(traceback.format_exc()) - logger.debug(f"Error fetching dbt files from http {repr(exc)}") - return None - - -def get_dbt_gcs(config) -> Optional[Tuple[str, str]]: - try: - dbt_options = config.dbtSecurityConfig.gcsConfig - if config.dbtProvider.value == "gcs": - options = { - "credentials": { - "type": dbt_options.type, - "project_id": dbt_options.projectId, - "private_key_id": dbt_options.privateKeyId, - "private_key": dbt_options.privateKey, - "client_email": dbt_options.clientEmail, - "client_id": dbt_options.clientId, - "auth_uri": dbt_options.authUri, - "token_uri": dbt_options.tokenUri, - "auth_provider_x509_cert_url": dbt_options.authProviderX509CertUrl, - "client_x509_cert_url": dbt_options.clientX509CertUrl, - } - } - else: - options = {"credentials_path": dbt_options.__root__} - if store_gcs_credentials(options): - from google.cloud import storage - - client = storage.Client() - for bucket in client.list_buckets(): - for blob in client.list_blobs(bucket.name): - if config.dbtManifestFileName in blob.name: - dbt_manifest = blob.download_as_string().decode() - if config.dbtCatalogFileName in blob.name: - dbt_catalog = blob.download_as_string().decode() - return json.loads(dbt_catalog), json.loads(dbt_manifest) - else: - return None - except Exception as exc: - logger.debug(traceback.format_exc()) - logger.debug(f"Error fetching dbt files from gcs {repr(exc)}") - return None - - -def get_dbt_s3(config) -> Optional[Tuple[str, str]]: - try: - from metadata.utils.aws_client import AWSClient - - aws_client = AWSClient(config.dbtSecurityConfig).get_resource("s3") - buckets = aws_client.buckets.all() - for bucket in buckets: - for bucket_object in bucket.objects.all(): - if config.dbtManifestFileName in bucket_object.key: - dbt_manifest = bucket_object.get()["Body"].read().decode() - if config.dbtCatalogFileName in bucket_object.key: - dbt_catalog = bucket_object.get()["Body"].read().decode() - return json.loads(dbt_catalog), json.loads(dbt_manifest) - except Exception as exc: - logger.debug(traceback.format_exc()) - logger.debug(f"Error fetching dbt files from s3 {repr(exc)}") - return None - - -def get_dbt_details(config) -> Optional[Tuple[str, str]]: - try: - if config.dbtProvider: - if config.dbtProvider.value == "s3": - return get_dbt_s3(config) - if "gcs" in config.dbtProvider.value: - return get_dbt_gcs(config) - if config.dbtProvider.value == "http": - return get_dbt_http(config) - if config.dbtProvider.value == "local": - return get_dbt_local(config) - raise Exception("Invalid DBT provider") - except Exception as exc: - logger.debug(traceback.format_exc()) - logger.debug(f"Error fetching dbt files {repr(exc)}") - return None - - class SQLSource(Source[OMetaDatabaseAndTable]): """ Source Connector implementation to extract @@ -256,7 +147,7 @@ class SQLSource(Source[OMetaDatabaseAndTable]): self.data_models = {} self.table_constraints = None self.database_source_state = set() - dbt_details = get_dbt_details(self.config.sourceConfig.config) + dbt_details = get_dbt_details(self.config.sourceConfig.config.dbtConfigSource) self.dbt_catalog = dbt_details[0] if dbt_details else None self.dbt_manifest = dbt_details[1] if dbt_details else None self.profile_date = datetime.now() @@ -565,7 +456,11 @@ class SQLSource(Source[OMetaDatabaseAndTable]): """ Get all the DBT information and feed it to the Table Entity """ - if self.source_config.dbtProvider and self.dbt_catalog and self.dbt_manifest: + if ( + self.source_config.dbtConfigSource + and self.dbt_manifest + and self.dbt_catalog + ): logger.info("Parsing Data Models") manifest_entities = { **self.dbt_manifest["nodes"], diff --git a/ingestion/src/metadata/utils/dbt_config.py b/ingestion/src/metadata/utils/dbt_config.py new file mode 100644 index 00000000000..acc3531b04c --- /dev/null +++ b/ingestion/src/metadata/utils/dbt_config.py @@ -0,0 +1,109 @@ +# Copyright 2021 Collate +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# http://www.apache.org/licenses/LICENSE-2.0 +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +""" +Hosts the singledispatch to get DBT files +""" +import json +import traceback +import urllib.request +from functools import singledispatch + +from metadata.generated.schema.metadataIngestion.databaseServiceMetadataPipeline import ( + DbtGCSConfig, + DbtHttpConfig, + DbtLocalConfig, + DbtS3Config, +) +from metadata.ingestion.ometa.utils import ometa_logger +from metadata.utils.credentials import set_google_credentials + +logger = ometa_logger() + +DBT_CATALOG_FILE_NAME = "catalog.json" +DBT_MANIFEST_FILE_NAME = "manifest.json" + + +@singledispatch +def get_dbt_details(config): + if config: + raise NotImplementedError( + f"Config not implemented for type {type(config)}: {config}" + ) + + +@get_dbt_details.register +def _(config: DbtLocalConfig): + try: + if config.dbtCatalogFilePath is not None: + with open(config.dbtCatalogFilePath, "r", encoding="utf-8") as catalog: + dbt_catalog = catalog.read() + if config.dbtManifestFilePath is not None: + with open(config.dbtManifestFilePath, "r", encoding="utf-8") as manifest: + dbt_manifest = manifest.read() + return json.loads(dbt_catalog), json.loads(dbt_manifest) + except Exception as exc: + logger.error(traceback.format_exc()) + logger.error(f"Error fetching dbt files from local {repr(exc)}") + return None + + +@get_dbt_details.register +def _(config: DbtHttpConfig): + try: + catalog_file = urllib.request.urlopen(config.dbtCatalogHttpPath) + manifest_file = urllib.request.urlopen(config.dbtManifestHttpPath) + dbt_catalog = catalog_file.read().decode() + dbt_manifest = manifest_file.read().decode() + return json.loads(dbt_catalog), json.loads(dbt_manifest) + except Exception as exc: + logger.error(traceback.format_exc()) + logger.error(f"Error fetching dbt files from file server {repr(exc)}") + return None + + +@get_dbt_details.register +def _(config: DbtS3Config): + try: + from metadata.utils.aws_client import AWSClient + + aws_client = AWSClient(config.dbtSecurityConfig).get_resource("s3") + buckets = aws_client.buckets.all() + for bucket in buckets: + for bucket_object in bucket.objects.all(): + if DBT_MANIFEST_FILE_NAME in bucket_object.key: + dbt_manifest = bucket_object.get()["Body"].read().decode() + if DBT_CATALOG_FILE_NAME in bucket_object.key: + dbt_catalog = bucket_object.get()["Body"].read().decode() + return json.loads(dbt_catalog), json.loads(dbt_manifest) + except Exception as exc: + logger.error(traceback.format_exc()) + logger.error(f"Error fetching dbt files from s3 {repr(exc)}") + return None + + +@get_dbt_details.register +def _(config: DbtGCSConfig): + try: + from google.cloud import storage + + set_google_credentials(gcs_credentials=config.dbtSecurityConfig) + client = storage.Client() + for bucket in client.list_buckets(): + for blob in client.list_blobs(bucket.name): + if DBT_MANIFEST_FILE_NAME in blob.name: + dbt_manifest = blob.download_as_string().decode() + if DBT_CATALOG_FILE_NAME in blob.name: + dbt_catalog = blob.download_as_string().decode() + return json.loads(dbt_catalog), json.loads(dbt_manifest) + except Exception as exc: + logger.error(traceback.format_exc()) + logger.error(f"Error fetching dbt files from gcs {repr(exc)}") + return None diff --git a/ingestion/src/metadata/utils/helpers.py b/ingestion/src/metadata/utils/helpers.py index 2b07e79d35d..67fe147f5b7 100644 --- a/ingestion/src/metadata/utils/helpers.py +++ b/ingestion/src/metadata/utils/helpers.py @@ -9,10 +9,7 @@ # See the License for the specific language governing permissions and # limitations under the License. -import json import logging -import os -import tempfile from datetime import datetime, timedelta from typing import Any, Dict, Iterable @@ -214,34 +211,3 @@ def get_raw_extract_iter(alchemy_helper) -> Iterable[Dict[str, Any]]: rows = alchemy_helper.execute_query() for row in rows: yield row - - -def create_credential_temp_file(credentials: dict) -> str: - with tempfile.NamedTemporaryFile(delete=False) as fp: - cred_json = json.dumps(credentials, indent=4, separators=(",", ": ")) - fp.write(cred_json.encode()) - return fp.name - - -def store_gcs_credentials(options) -> bool: - """ - Method to store GCS credentials from the config file or from a file whose path is provided - into the environment variable as required by GCS - """ - if not os.environ.get("GOOGLE_APPLICATION_CREDENTIALS"): - if options.get("credentials_path"): - os.environ["GOOGLE_APPLICATION_CREDENTIALS"] = options["credentials_path"] - del options["credentials_path"] - elif options.get("credentials"): - temp_credentials = create_credential_temp_file( - credentials=options.get("credentials") - ) - os.environ["GOOGLE_APPLICATION_CREDENTIALS"] = temp_credentials - del options["credentials"] - else: - logger.warning( - "Please refer to the Google Cloud Storage documentation, especially the credentials part" - "https://cloud.google.com/storage/docs/authentication" - ) - return False - return True