diff --git a/catalog-rest-service/src/main/resources/json/schema/metadataIngestion/databaseServiceMetadataPipeline.json b/catalog-rest-service/src/main/resources/json/schema/metadataIngestion/databaseServiceMetadataPipeline.json index 4310539d421..a5624f4988d 100644 --- a/catalog-rest-service/src/main/resources/json/schema/metadataIngestion/databaseServiceMetadataPipeline.json +++ b/catalog-rest-service/src/main/resources/json/schema/metadataIngestion/databaseServiceMetadataPipeline.json @@ -3,6 +3,24 @@ "$schema": "http://json-schema.org/draft-07/schema#", "title": "DatabaseServiceMetadataPipeline", "description": "DatabaseService Metadata Pipeline Configuration.", + "definitions": { + "localHttpDBTConfig": { + "description": "Local and HTTP DBT configs.", + "type": "object", + "properties": { + "dbtCatalogFilePath": { + "description": "DBT catalog file to extract dbt models with their column schemas.", + "type": "string" + }, + "dbtManifestFilePath": { + "description": "DBT manifest file path to extract dbt models and associate with tables.", + "type": "string" + } + }, + "additionalProperties": false, + "required": ["dbtCatalogFilePath", "dbtManifestFilePath"] + } + }, "properties": { "markDeletedTables": { "description": "Optional configuration to soft delete tables in OpenMetadata if the source tables are deleted.", @@ -42,13 +60,34 @@ "description": "Regex exclude tables or databases that matches the pattern.", "$ref": "../type/filterPattern.json#/definitions/filterPattern" }, - "dbtCatalogFilePath": { - "description": "DBT catalog file to extract dbt models with their column schemas.", - "type": "string" + "dbtProvider": { + "description": "Method from which the DBT files will be fetched. Accepted values are: 's3'(Required aws s3 credentials to be provided), 'gcs'(Required gcs credentials to be provided), 'gcs-path'(path of the file containing gcs credentials), 'local'(path of dbt files on local system), 'http'(url path of dbt files).", + "type": "string", + "enum": ["local", "http", "gcs", "gcs-path", "s3"] }, - "dbtManifestFilePath": { - "description": "DBT manifest file path to extract dbt models and associate with tables.", - "type": "string" + "dbtConfig": { + "description": "DBT configuration.", + "oneOf": [ + { + "$ref": "#/definitions/localHttpDBTConfig" + }, + { + "$ref": "../security/credentials/gcsCredentials.json" + }, + { + "$ref": "../security/credentials/s3Credentials.json" + } + ] + }, + "dbtCatalogFileName": { + "description": "DBT Catalog file name", + "type": "string", + "default": "catalog.json" + }, + "dbtManifestFileName": { + "description": "DBT Manifest file name", + "type": "string", + "default": "manifest.json" } }, "additionalProperties": false diff --git a/catalog-rest-service/src/main/resources/json/schema/security/credentials/gcsCredentials.json b/catalog-rest-service/src/main/resources/json/schema/security/credentials/gcsCredentials.json index 90c2678c7a3..a845ece3cff 100644 --- a/catalog-rest-service/src/main/resources/json/schema/security/credentials/gcsCredentials.json +++ b/catalog-rest-service/src/main/resources/json/schema/security/credentials/gcsCredentials.json @@ -79,4 +79,4 @@ "required": [ "gcsConfig" ] -} \ No newline at end of file +} diff --git a/catalog-rest-service/src/main/resources/json/schema/security/credentials/s3Credentials.json b/catalog-rest-service/src/main/resources/json/schema/security/credentials/s3Credentials.json new file mode 100644 index 00000000000..7fe90b75e6c --- /dev/null +++ b/catalog-rest-service/src/main/resources/json/schema/security/credentials/s3Credentials.json @@ -0,0 +1,33 @@ +{ + "$id": "https://open-metadata.org/security/credentials/s3Credentials.json", + "$schema": "http://json-schema.org/draft-07/schema#", + "title": "S3Credentials", + "description": "AWS S3 credentials configs.", + "type": "object", + "javaType": "org.openmetadata.catalog.security.credentials.S3Credentials", + "properties": { + "awsAccessKeyId": { + "description": "AWS Access key ID.", + "type": "string" + }, + "awsSecretAccessKey": { + "description": "AWS Secret Access Key.", + "type": "string", + "format": "password" + }, + "awsRegion": { + "description": "AWS Region", + "type": "string" + }, + "awsSessionToken": { + "description": "AWS Session Token.", + "type": "string" + }, + "endPointURL": { + "description": "EndPoint URL for the AWS", + "type": "string" + } + }, + "additionalProperties": false, + "required": ["awsAccessKeyId", "awsSecretAccessKey", "awsRegion"] +} diff --git a/ingestion/setup.py b/ingestion/setup.py index 22ee07ed7fe..740305d0950 100644 --- a/ingestion/setup.py +++ b/ingestion/setup.py @@ -82,7 +82,7 @@ plugins: Dict[str, Set[str]] = { "bigquery-usage": {"google-cloud-logging", "cachetools"}, "docker": {"python_on_whales==0.34.0"}, "backup": {"boto3~=1.19.12"}, - "dbt": {}, + "dbt": {"google-cloud", "boto3"}, "druid": {"pydruid>=0.6.2"}, "elasticsearch": {"elasticsearch==7.13.1"}, "glue": {"boto3~=1.19.12"}, diff --git a/ingestion/src/metadata/ingestion/source/sql_source.py b/ingestion/src/metadata/ingestion/source/sql_source.py index 3880b33a23f..8010b531f7c 100644 --- a/ingestion/src/metadata/ingestion/source/sql_source.py +++ b/ingestion/src/metadata/ingestion/source/sql_source.py @@ -14,6 +14,7 @@ Generic source to build SQL connectors. import json import re import traceback +import urllib.request import uuid from dataclasses import dataclass, field from datetime import datetime @@ -66,6 +67,7 @@ from metadata.utils.column_type_parser import ColumnTypeParser from metadata.utils.engines import create_and_bind_session, get_engine, test_connection from metadata.utils.filters import filter_by_schema, filter_by_table from metadata.utils.fqdn_generator import get_fqdn +from metadata.utils.helpers import store_gcs_credentials logger = ometa_logger() @@ -102,6 +104,114 @@ def _get_table_description(schema: str, table: str, inspector: Inspector) -> str return description +def get_dbt_local(config) -> Optional[Tuple[str, str]]: + try: + if config.dbtConfig.dbtCatalogFilePath is not None: + with open( + config.dbtConfig.dbtCatalogFilePath, "r", encoding="utf-8" + ) as catalog: + dbt_catalog = catalog.read() + if config.dbtConfig.dbtManifestFilePath is not None: + with open( + config.dbtConfig.dbtManifestFilePath, "r", encoding="utf-8" + ) as manifest: + dbt_manifest = manifest.read() + return json.loads(dbt_catalog), json.loads(dbt_manifest) + except Exception as exc: + logger.debug(traceback.format_exc()) + logger.debug(f"Error fetching dbt files from local {repr(exc)}") + return None + + +def get_dbt_http(config) -> Optional[Tuple[str, str]]: + try: + catalog_file = urllib.request.urlopen(config.dbtConfig.dbtCatalogFilePath) + manifest_file = urllib.request.urlopen(config.dbtConfig.dbtManifestFilePath) + dbt_catalog = catalog_file.read().decode() + dbt_manifest = manifest_file.read().decode() + return json.loads(dbt_catalog), json.loads(dbt_manifest) + except Exception as exc: + logger.debug(traceback.format_exc()) + logger.debug(f"Error fetching dbt files from http {repr(exc)}") + return None + + +def get_dbt_gcs(config) -> Optional[Tuple[str, str]]: + try: + dbt_options = config.dbtConfig.gcsConfig + if config.dbtProvider.value == "gcs": + options = { + "credentials": { + "type": dbt_options.type, + "project_id": dbt_options.projectId, + "private_key_id": dbt_options.privateKeyId, + "private_key": dbt_options.privateKey, + "client_email": dbt_options.clientEmail, + "client_id": dbt_options.clientId, + "auth_uri": dbt_options.authUri, + "token_uri": dbt_options.tokenUri, + "auth_provider_x509_cert_url": dbt_options.authProviderX509CertUrl, + "client_x509_cert_url": dbt_options.clientX509CertUrl, + } + } + else: + options = {"credentials_path": dbt_options.__root__} + if store_gcs_credentials(options): + from google.cloud import storage + + client = storage.Client() + for bucket in client.list_buckets(): + for blob in client.list_blobs(bucket.name): + if config.dbtManifestFileName in blob.name: + dbt_manifest = blob.download_as_string().decode() + if config.dbtCatalogFileName in blob.name: + dbt_catalog = blob.download_as_string().decode() + return json.loads(dbt_catalog), json.loads(dbt_manifest) + else: + return None + except Exception as exc: + logger.debug(traceback.format_exc()) + logger.debug(f"Error fetching dbt files from gcs {repr(exc)}") + return None + + +def get_dbt_s3(config) -> Optional[Tuple[str, str]]: + try: + from metadata.utils.aws_client import AWSClient + + aws_client = AWSClient(config.dbtConfig).get_resource("s3") + buckets = aws_client.buckets.all() + for bucket in buckets: + for bucket_object in bucket.objects.all(): + if config.dbtManifestFileName in bucket_object.key: + dbt_manifest = bucket_object.get()["Body"].read().decode() + if config.dbtCatalogFileName in bucket_object.key: + dbt_catalog = bucket_object.get()["Body"].read().decode() + return json.loads(dbt_catalog), json.loads(dbt_manifest) + except Exception as exc: + logger.debug(traceback.format_exc()) + logger.debug(f"Error fetching dbt files from s3 {repr(exc)}") + return None + + +def get_dbt_details(config) -> Optional[Tuple[str, str]]: + try: + if config.dbtProvider: + if config.dbtProvider.value == "s3": + return get_dbt_s3(config) + if "gcs" in config.dbtProvider.value: + return get_dbt_gcs(config) + if config.dbtProvider.value == "http": + return get_dbt_http(config) + if config.dbtProvider.value == "local": + return get_dbt_local(config) + raise Exception("Invalid DBT provider") + except Exception as exc: + logger.debug(traceback.format_exc()) + logger.debug(f"Error fetching dbt files {repr(exc)}") + return None + + class SQLSource(Source[OMetaDatabaseAndTable]): """ Source Connector implementation to extract @@ -143,16 +253,9 @@ class SQLSource(Source[OMetaDatabaseAndTable]): self.data_models = {} self.table_constraints = None self.database_source_state = set() - if self.source_config.dbtCatalogFilePath: - with open( - self.source_config.dbtCatalogFilePath, "r", encoding="utf-8" - ) as catalog: - self.dbt_catalog = json.load(catalog) - if self.source_config.dbtManifestFilePath: - with open( - self.source_config.dbtManifestFilePath, "r", encoding="utf-8" - ) as manifest: - self.dbt_manifest = json.load(manifest) + dbt_details = get_dbt_details(self.config.sourceConfig.config) + self.dbt_catalog = dbt_details[0] if dbt_details else None + self.dbt_manifest = dbt_details[1] if dbt_details else None self.profile_date = datetime.now() def test_connection(self) -> None: @@ -450,10 +553,7 @@ class SQLSource(Source[OMetaDatabaseAndTable]): """ Get all the DBT information and feed it to the Table Entity """ - if ( - self.source_config.dbtManifestFilePath - and self.source_config.dbtCatalogFilePath - ): + if self.source_config.dbtProvider and self.dbt_catalog and self.dbt_manifest: logger.info("Parsing Data Models") manifest_entities = { **self.dbt_manifest["nodes"], @@ -506,7 +606,6 @@ class SQLSource(Source[OMetaDatabaseAndTable]): table_fqn = get_fqdn( Table, service_name=self.config.serviceName, - dashboard_name=database, table_name=table, ).lower() upstream_nodes.append(table_fqn) diff --git a/ingestion/src/metadata/utils/helpers.py b/ingestion/src/metadata/utils/helpers.py index bc55cad40df..2b07e79d35d 100644 --- a/ingestion/src/metadata/utils/helpers.py +++ b/ingestion/src/metadata/utils/helpers.py @@ -9,7 +9,10 @@ # See the License for the specific language governing permissions and # limitations under the License. +import json import logging +import os +import tempfile from datetime import datetime, timedelta from typing import Any, Dict, Iterable @@ -154,7 +157,6 @@ def get_dashboard_service_or_create( return service else: dashboard_config = {"config": config} - print(dashboard_config) created_service = metadata.create_or_update( CreateDashboardServiceRequest( name=service_name, @@ -212,3 +214,34 @@ def get_raw_extract_iter(alchemy_helper) -> Iterable[Dict[str, Any]]: rows = alchemy_helper.execute_query() for row in rows: yield row + + +def create_credential_temp_file(credentials: dict) -> str: + with tempfile.NamedTemporaryFile(delete=False) as fp: + cred_json = json.dumps(credentials, indent=4, separators=(",", ": ")) + fp.write(cred_json.encode()) + return fp.name + + +def store_gcs_credentials(options) -> bool: + """ + Method to store GCS credentials from the config file or from a file whose path is provided + into the environment variable as required by GCS + """ + if not os.environ.get("GOOGLE_APPLICATION_CREDENTIALS"): + if options.get("credentials_path"): + os.environ["GOOGLE_APPLICATION_CREDENTIALS"] = options["credentials_path"] + del options["credentials_path"] + elif options.get("credentials"): + temp_credentials = create_credential_temp_file( + credentials=options.get("credentials") + ) + os.environ["GOOGLE_APPLICATION_CREDENTIALS"] = temp_credentials + del options["credentials"] + else: + logger.warning( + "Please refer to the Google Cloud Storage documentation, especially the credentials part" + "https://cloud.google.com/storage/docs/authentication" + ) + return False + return True