DBT ingestion improvements (#9383)

* DBT ingestion improvements

* use fqn build instead of join

* fix linting

* fix linting
This commit is contained in:
Mayur Singal 2022-12-19 11:07:54 +05:30 committed by GitHub
parent e1919af86a
commit 425a50bc3a
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 68 additions and 47 deletions

View File

@ -510,16 +510,20 @@ class DbtSource(DbtServiceSource): # pylint: disable=too-many-public-methods
logger.info(f"Processing DBT Query lineage for: {table_fqn}")
try:
source_elements = table_fqn.split(".")
source_elements = fqn.split(table_fqn)
# remove service name from fqn to make it parseable in format db.schema.table
query_fqn = fqn._build( # pylint: disable=protected-access
*source_elements[-3:]
)
query = (
f"create table {table_fqn} as {data_model_link.datamodel.sql.__root__}"
f"create table {query_fqn} as {data_model_link.datamodel.sql.__root__}"
)
lineages = get_lineage_by_query(
self.metadata,
query=query,
service_name=source_elements[1],
database_name=source_elements[2],
schema_name=source_elements[3],
service_name=source_elements[0],
database_name=source_elements[1],
schema_name=source_elements[2],
)
for lineage_request in lineages or []:
yield lineage_request

View File

@ -13,9 +13,7 @@ DBT service Topology.
"""
from abc import ABC, abstractmethod
from typing import Iterable, Optional
from pydantic import BaseModel
from typing import Iterable
from metadata.generated.schema.api.lineage.addLineage import AddLineageRequest
from metadata.generated.schema.api.tests.createTestCase import CreateTestCaseRequest
@ -34,18 +32,12 @@ from metadata.ingestion.models.topology import (
create_source_context,
)
from metadata.ingestion.source.database.database_service import DataModelLink
from metadata.utils.dbt_config import get_dbt_details
from metadata.utils.dbt_config import DbtFiles, get_dbt_details
from metadata.utils.logger import ingestion_logger
logger = ingestion_logger()
class DbtFiles(BaseModel):
dbt_catalog: Optional[dict]
dbt_manifest: Optional[dict]
dbt_run_results: Optional[dict]
class DbtServiceTopology(ServiceTopology):
"""
Defines the hierarchy in Database Services.
@ -139,14 +131,9 @@ class DbtServiceSource(TopologyRunnerMixin, Source, ABC):
context = create_source_context(topology)
def get_dbt_files(self) -> DbtFiles:
dbt_details = get_dbt_details(
dbt_files = get_dbt_details(
self.source_config.dbtConfigSource # pylint: disable=no-member
)
dbt_files = DbtFiles(
dbt_catalog=dbt_details[0],
dbt_manifest=dbt_details[1],
dbt_run_results=dbt_details[2],
)
yield dbt_files
@abstractmethod

View File

@ -17,6 +17,7 @@ from functools import singledispatch
from typing import Optional, Tuple
import requests
from pydantic import BaseModel
from metadata.generated.schema.metadataIngestion.dbtconfig.dbtCloudConfig import (
DbtCloudConfig,
@ -43,6 +44,18 @@ DBT_MANIFEST_FILE_NAME = "manifest.json"
DBT_RUN_RESULTS_FILE_NAME = "run_results.json"
class DbtFiles(BaseModel):
dbt_catalog: Optional[dict]
dbt_manifest: Optional[dict]
dbt_run_results: Optional[dict]
class DBTConfigException(Exception):
"""
Raise when encountering errors while extacting dbt files
"""
@singledispatch
def get_dbt_details(config):
"""
@ -80,15 +93,14 @@ def _(config: DbtLocalConfig):
)
with open(config.dbtCatalogFilePath, "r", encoding="utf-8") as catalog:
dbt_catalog = catalog.read()
return (
json.loads(dbt_catalog) if dbt_catalog else None,
json.loads(dbt_manifest),
json.loads(dbt_run_results) if dbt_run_results else None,
return DbtFiles(
dbt_catalog=json.loads(dbt_catalog) if dbt_catalog else None,
dbt_manifest=json.loads(dbt_manifest),
dbt_run_results=json.loads(dbt_run_results) if dbt_run_results else None,
)
except Exception as exc:
logger.debug(traceback.format_exc())
logger.warning(f"Error fetching dbt files from local: {exc}")
return None
raise DBTConfigException(f"Error fetching dbt files from local: {exc}")
@get_dbt_details.register
@ -117,15 +129,18 @@ def _(config: DbtHttpConfig):
dbt_catalog = requests.get( # pylint: disable=missing-timeout
config.dbtCatalogHttpPath
)
return (
json.loads(dbt_catalog.text) if dbt_catalog else None,
json.loads(dbt_manifest.text),
json.loads(dbt_run_results.text) if dbt_run_results else None,
if not dbt_manifest:
raise DBTConfigException("Menifest file not found in file server")
return DbtFiles(
dbt_catalog=json.loads(dbt_catalog) if dbt_catalog else None,
dbt_manifest=json.loads(dbt_manifest),
dbt_run_results=json.loads(dbt_run_results) if dbt_run_results else None,
)
except DBTConfigException as exc:
raise exc
except Exception as exc:
logger.debug(traceback.format_exc())
logger.warning(f"Error fetching dbt files from file server: {exc}")
return None
raise DBTConfigException(f"Error fetching dbt files from file server: {exc}")
@get_dbt_details.register
@ -173,10 +188,19 @@ def _(config: DbtCloudConfig): # pylint: disable=too-many-locals
dbt_run_results = client.get(
f"/accounts/{account_id}/runs/{run_id}/artifacts/{DBT_RUN_RESULTS_FILE_NAME}"
)
if not dbt_manifest:
raise DBTConfigException("Menifest file not found in DBT Cloud")
return DbtFiles(
dbt_catalog=dbt_catalog,
dbt_manifest=dbt_manifest,
dbt_run_results=dbt_run_results,
)
except DBTConfigException as exc:
raise exc
except Exception as exc:
logger.debug(traceback.format_exc())
logger.warning(f"Error fetching dbt files from DBT Cloud: {exc}")
return dbt_catalog, dbt_manifest, dbt_run_results
raise DBTConfigException(f"Error fetching dbt files from DBT Cloud: {exc}")
@get_dbt_details.register
@ -211,15 +235,18 @@ def _(config: DbtS3Config):
if DBT_RUN_RESULTS_FILE_NAME in bucket_object.key:
logger.debug(f"{DBT_RUN_RESULTS_FILE_NAME} found")
dbt_run_results = bucket_object.get()["Body"].read().decode()
return (
json.loads(dbt_catalog) if dbt_catalog else None,
json.loads(dbt_manifest),
json.loads(dbt_run_results) if dbt_run_results else None,
if not dbt_manifest:
raise DBTConfigException("Menifest file not found in s3")
return DbtFiles(
dbt_catalog=json.loads(dbt_catalog) if dbt_catalog else None,
dbt_manifest=json.loads(dbt_manifest),
dbt_run_results=json.loads(dbt_run_results) if dbt_run_results else None,
)
except DBTConfigException as exc:
raise exc
except Exception as exc:
logger.debug(traceback.format_exc())
logger.warning(f"Error fetching dbt files from s3: {exc}")
return dbt_catalog, dbt_manifest
raise DBTConfigException(f"Error fetching dbt files from s3: {exc}")
@get_dbt_details.register
@ -252,15 +279,18 @@ def _(config: DbtGcsConfig):
if DBT_RUN_RESULTS_FILE_NAME in blob.name:
logger.debug(f"{DBT_RUN_RESULTS_FILE_NAME} found")
dbt_run_results = blob.download_as_string().decode()
return (
json.loads(dbt_catalog) if dbt_catalog else None,
json.loads(dbt_manifest),
json.loads(dbt_run_results) if dbt_run_results else None,
if not dbt_manifest:
raise DBTConfigException("Menifest file not found in gcs")
return DbtFiles(
dbt_catalog=json.loads(dbt_catalog) if dbt_catalog else None,
dbt_manifest=json.loads(dbt_manifest),
dbt_run_results=json.loads(dbt_run_results) if dbt_run_results else None,
)
except DBTConfigException as exc:
raise exc
except Exception as exc:
logger.debug(traceback.format_exc())
logger.warning(f"Error fetching dbt files from gcs: {exc}")
return dbt_catalog, dbt_manifest
raise DBTConfigException(f"Error fetching dbt files from gcs: {exc}")
def get_dbt_prefix_config(config) -> Tuple[Optional[str], Optional[str]]: