ISSUE-2925: Added support to get dbt files via s3, gcs and http sources (#3736)

This commit is contained in:
Onkar Ravgan 2022-04-19 19:36:24 +05:30 committed by GitHub
parent dd2ccb1a89
commit c4aa07858d
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
6 changed files with 228 additions and 24 deletions

View File

@ -3,6 +3,24 @@
"$schema": "http://json-schema.org/draft-07/schema#", "$schema": "http://json-schema.org/draft-07/schema#",
"title": "DatabaseServiceMetadataPipeline", "title": "DatabaseServiceMetadataPipeline",
"description": "DatabaseService Metadata Pipeline Configuration.", "description": "DatabaseService Metadata Pipeline Configuration.",
"definitions": {
"localHttpDBTConfig": {
"description": "Local and HTTP DBT configs.",
"type": "object",
"properties": {
"dbtCatalogFilePath": {
"description": "DBT catalog file to extract dbt models with their column schemas.",
"type": "string"
},
"dbtManifestFilePath": {
"description": "DBT manifest file path to extract dbt models and associate with tables.",
"type": "string"
}
},
"additionalProperties": false,
"required": ["dbtCatalogFilePath", "dbtManifestFilePath"]
}
},
"properties": { "properties": {
"markDeletedTables": { "markDeletedTables": {
"description": "Optional configuration to soft delete tables in OpenMetadata if the source tables are deleted.", "description": "Optional configuration to soft delete tables in OpenMetadata if the source tables are deleted.",
@ -42,13 +60,34 @@
"description": "Regex exclude tables or databases that matches the pattern.", "description": "Regex exclude tables or databases that matches the pattern.",
"$ref": "../type/filterPattern.json#/definitions/filterPattern" "$ref": "../type/filterPattern.json#/definitions/filterPattern"
}, },
"dbtCatalogFilePath": { "dbtProvider": {
"description": "DBT catalog file to extract dbt models with their column schemas.", "description": "Method from which the DBT files will be fetched. Accepted values are: 's3'(Required aws s3 credentials to be provided), 'gcs'(Required gcs credentials to be provided), 'gcs-path'(path of the file containing gcs credentials), 'local'(path of dbt files on local system), 'http'(url path of dbt files).",
"type": "string" "type": "string",
"enum": ["local", "http", "gcs", "gcs-path", "s3"]
}, },
"dbtManifestFilePath": { "dbtConfig": {
"description": "DBT manifest file path to extract dbt models and associate with tables.", "description": "DBT configuration.",
"type": "string" "oneOf": [
{
"$ref": "#/definitions/localHttpDBTConfig"
},
{
"$ref": "../security/credentials/gcsCredentials.json"
},
{
"$ref": "../security/credentials/s3Credentials.json"
}
]
},
"dbtCatalogFileName": {
"description": "DBT Catalog file name",
"type": "string",
"default": "catalog.json"
},
"dbtManifestFileName": {
"description": "DBT Manifest file name",
"type": "string",
"default": "manifest.json"
} }
}, },
"additionalProperties": false "additionalProperties": false

View File

@ -79,4 +79,4 @@
"required": [ "required": [
"gcsConfig" "gcsConfig"
] ]
} }

View File

@ -0,0 +1,33 @@
{
"$id": "https://open-metadata.org/security/credentials/s3Credentials.json",
"$schema": "http://json-schema.org/draft-07/schema#",
"title": "S3Credentials",
"description": "AWS S3 credentials configs.",
"type": "object",
"javaType": "org.openmetadata.catalog.security.credentials.S3Credentials",
"properties": {
"awsAccessKeyId": {
"description": "AWS Access key ID.",
"type": "string"
},
"awsSecretAccessKey": {
"description": "AWS Secret Access Key.",
"type": "string",
"format": "password"
},
"awsRegion": {
"description": "AWS Region",
"type": "string"
},
"awsSessionToken": {
"description": "AWS Session Token.",
"type": "string"
},
"endPointURL": {
"description": "EndPoint URL for the AWS",
"type": "string"
}
},
"additionalProperties": false,
"required": ["awsAccessKeyId", "awsSecretAccessKey", "awsRegion"]
}

View File

@ -82,7 +82,7 @@ plugins: Dict[str, Set[str]] = {
"bigquery-usage": {"google-cloud-logging", "cachetools"}, "bigquery-usage": {"google-cloud-logging", "cachetools"},
"docker": {"python_on_whales==0.34.0"}, "docker": {"python_on_whales==0.34.0"},
"backup": {"boto3~=1.19.12"}, "backup": {"boto3~=1.19.12"},
"dbt": {}, "dbt": {"google-cloud", "boto3"},
"druid": {"pydruid>=0.6.2"}, "druid": {"pydruid>=0.6.2"},
"elasticsearch": {"elasticsearch==7.13.1"}, "elasticsearch": {"elasticsearch==7.13.1"},
"glue": {"boto3~=1.19.12"}, "glue": {"boto3~=1.19.12"},

View File

@ -14,6 +14,7 @@ Generic source to build SQL connectors.
import json import json
import re import re
import traceback import traceback
import urllib.request
import uuid import uuid
from dataclasses import dataclass, field from dataclasses import dataclass, field
from datetime import datetime from datetime import datetime
@ -66,6 +67,7 @@ from metadata.utils.column_type_parser import ColumnTypeParser
from metadata.utils.engines import create_and_bind_session, get_engine, test_connection from metadata.utils.engines import create_and_bind_session, get_engine, test_connection
from metadata.utils.filters import filter_by_schema, filter_by_table from metadata.utils.filters import filter_by_schema, filter_by_table
from metadata.utils.fqdn_generator import get_fqdn from metadata.utils.fqdn_generator import get_fqdn
from metadata.utils.helpers import store_gcs_credentials
logger = ometa_logger() logger = ometa_logger()
@ -102,6 +104,114 @@ def _get_table_description(schema: str, table: str, inspector: Inspector) -> str
return description return description
def get_dbt_local(config) -> Optional[Tuple[str, str]]:
try:
if config.dbtConfig.dbtCatalogFilePath is not None:
with open(
config.dbtConfig.dbtCatalogFilePath, "r", encoding="utf-8"
) as catalog:
dbt_catalog = catalog.read()
if config.dbtConfig.dbtManifestFilePath is not None:
with open(
config.dbtConfig.dbtManifestFilePath, "r", encoding="utf-8"
) as manifest:
dbt_manifest = manifest.read()
return json.loads(dbt_catalog), json.loads(dbt_manifest)
except Exception as exc:
logger.debug(traceback.format_exc())
logger.debug(f"Error fetching dbt files from local {repr(exc)}")
return None
def get_dbt_http(config) -> Optional[Tuple[str, str]]:
try:
catalog_file = urllib.request.urlopen(config.dbtConfig.dbtCatalogFilePath)
manifest_file = urllib.request.urlopen(config.dbtConfig.dbtManifestFilePath)
dbt_catalog = catalog_file.read().decode()
dbt_manifest = manifest_file.read().decode()
return json.loads(dbt_catalog), json.loads(dbt_manifest)
except Exception as exc:
logger.debug(traceback.format_exc())
logger.debug(f"Error fetching dbt files from http {repr(exc)}")
return None
def get_dbt_gcs(config) -> Optional[Tuple[str, str]]:
try:
dbt_options = config.dbtConfig.gcsConfig
if config.dbtProvider.value == "gcs":
options = {
"credentials": {
"type": dbt_options.type,
"project_id": dbt_options.projectId,
"private_key_id": dbt_options.privateKeyId,
"private_key": dbt_options.privateKey,
"client_email": dbt_options.clientEmail,
"client_id": dbt_options.clientId,
"auth_uri": dbt_options.authUri,
"token_uri": dbt_options.tokenUri,
"auth_provider_x509_cert_url": dbt_options.authProviderX509CertUrl,
"client_x509_cert_url": dbt_options.clientX509CertUrl,
}
}
else:
options = {"credentials_path": dbt_options.__root__}
if store_gcs_credentials(options):
from google.cloud import storage
client = storage.Client()
for bucket in client.list_buckets():
for blob in client.list_blobs(bucket.name):
if config.dbtManifestFileName in blob.name:
dbt_manifest = blob.download_as_string().decode()
if config.dbtCatalogFileName in blob.name:
dbt_catalog = blob.download_as_string().decode()
return json.loads(dbt_catalog), json.loads(dbt_manifest)
else:
return None
except Exception as exc:
logger.debug(traceback.format_exc())
logger.debug(f"Error fetching dbt files from gcs {repr(exc)}")
return None
def get_dbt_s3(config) -> Optional[Tuple[str, str]]:
try:
from metadata.utils.aws_client import AWSClient
aws_client = AWSClient(config.dbtConfig).get_resource("s3")
buckets = aws_client.buckets.all()
for bucket in buckets:
for bucket_object in bucket.objects.all():
if config.dbtManifestFileName in bucket_object.key:
dbt_manifest = bucket_object.get()["Body"].read().decode()
if config.dbtCatalogFileName in bucket_object.key:
dbt_catalog = bucket_object.get()["Body"].read().decode()
return json.loads(dbt_catalog), json.loads(dbt_manifest)
except Exception as exc:
logger.debug(traceback.format_exc())
logger.debug(f"Error fetching dbt files from s3 {repr(exc)}")
return None
def get_dbt_details(config) -> Optional[Tuple[str, str]]:
try:
if config.dbtProvider:
if config.dbtProvider.value == "s3":
return get_dbt_s3(config)
if "gcs" in config.dbtProvider.value:
return get_dbt_gcs(config)
if config.dbtProvider.value == "http":
return get_dbt_http(config)
if config.dbtProvider.value == "local":
return get_dbt_local(config)
raise Exception("Invalid DBT provider")
except Exception as exc:
logger.debug(traceback.format_exc())
logger.debug(f"Error fetching dbt files {repr(exc)}")
return None
class SQLSource(Source[OMetaDatabaseAndTable]): class SQLSource(Source[OMetaDatabaseAndTable]):
""" """
Source Connector implementation to extract Source Connector implementation to extract
@ -143,16 +253,9 @@ class SQLSource(Source[OMetaDatabaseAndTable]):
self.data_models = {} self.data_models = {}
self.table_constraints = None self.table_constraints = None
self.database_source_state = set() self.database_source_state = set()
if self.source_config.dbtCatalogFilePath: dbt_details = get_dbt_details(self.config.sourceConfig.config)
with open( self.dbt_catalog = dbt_details[0] if dbt_details else None
self.source_config.dbtCatalogFilePath, "r", encoding="utf-8" self.dbt_manifest = dbt_details[1] if dbt_details else None
) as catalog:
self.dbt_catalog = json.load(catalog)
if self.source_config.dbtManifestFilePath:
with open(
self.source_config.dbtManifestFilePath, "r", encoding="utf-8"
) as manifest:
self.dbt_manifest = json.load(manifest)
self.profile_date = datetime.now() self.profile_date = datetime.now()
def test_connection(self) -> None: def test_connection(self) -> None:
@ -450,10 +553,7 @@ class SQLSource(Source[OMetaDatabaseAndTable]):
""" """
Get all the DBT information and feed it to the Table Entity Get all the DBT information and feed it to the Table Entity
""" """
if ( if self.source_config.dbtProvider and self.dbt_catalog and self.dbt_manifest:
self.source_config.dbtManifestFilePath
and self.source_config.dbtCatalogFilePath
):
logger.info("Parsing Data Models") logger.info("Parsing Data Models")
manifest_entities = { manifest_entities = {
**self.dbt_manifest["nodes"], **self.dbt_manifest["nodes"],
@ -506,7 +606,6 @@ class SQLSource(Source[OMetaDatabaseAndTable]):
table_fqn = get_fqdn( table_fqn = get_fqdn(
Table, Table,
service_name=self.config.serviceName, service_name=self.config.serviceName,
dashboard_name=database,
table_name=table, table_name=table,
).lower() ).lower()
upstream_nodes.append(table_fqn) upstream_nodes.append(table_fqn)

View File

@ -9,7 +9,10 @@
# See the License for the specific language governing permissions and # See the License for the specific language governing permissions and
# limitations under the License. # limitations under the License.
import json
import logging import logging
import os
import tempfile
from datetime import datetime, timedelta from datetime import datetime, timedelta
from typing import Any, Dict, Iterable from typing import Any, Dict, Iterable
@ -154,7 +157,6 @@ def get_dashboard_service_or_create(
return service return service
else: else:
dashboard_config = {"config": config} dashboard_config = {"config": config}
print(dashboard_config)
created_service = metadata.create_or_update( created_service = metadata.create_or_update(
CreateDashboardServiceRequest( CreateDashboardServiceRequest(
name=service_name, name=service_name,
@ -212,3 +214,34 @@ def get_raw_extract_iter(alchemy_helper) -> Iterable[Dict[str, Any]]:
rows = alchemy_helper.execute_query() rows = alchemy_helper.execute_query()
for row in rows: for row in rows:
yield row yield row
def create_credential_temp_file(credentials: dict) -> str:
with tempfile.NamedTemporaryFile(delete=False) as fp:
cred_json = json.dumps(credentials, indent=4, separators=(",", ": "))
fp.write(cred_json.encode())
return fp.name
def store_gcs_credentials(options) -> bool:
"""
Method to store GCS credentials from the config file or from a file whose path is provided
into the environment variable as required by GCS
"""
if not os.environ.get("GOOGLE_APPLICATION_CREDENTIALS"):
if options.get("credentials_path"):
os.environ["GOOGLE_APPLICATION_CREDENTIALS"] = options["credentials_path"]
del options["credentials_path"]
elif options.get("credentials"):
temp_credentials = create_credential_temp_file(
credentials=options.get("credentials")
)
os.environ["GOOGLE_APPLICATION_CREDENTIALS"] = temp_credentials
del options["credentials"]
else:
logger.warning(
"Please refer to the Google Cloud Storage documentation, especially the credentials part"
"https://cloud.google.com/storage/docs/authentication"
)
return False
return True