Added DBT Workflow fixes (#9419)

* Added DBT fixes

* Addressed review comments
This commit is contained in:
Onkar Ravgan 2022-12-20 19:06:01 +05:30 committed by GitHub
parent f81867dd97
commit c1fdc59e84
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 25 additions and 23 deletions

View File

@ -286,28 +286,30 @@ class DbtSource(DbtServiceSource): # pylint: disable=too-many-public-methods
self.context.dbt_tests = {}
for key, manifest_node in manifest_entities.items():
try:
# Skip the analysis node since it does not contain relevant metatada
if manifest_node["resource_type"] in ["analysis"]:
continue
# If the run_results file is passed then only DBT tests will be processed
if dbt_files.dbt_run_results:
if (
dbt_files.dbt_run_results
and manifest_node["resource_type"] == "test"
):
# Test nodes will be processed further in the topology
if manifest_node["resource_type"] == "test":
self.context.dbt_tests[key] = manifest_node
self.context.dbt_tests[key][
"upstream"
] = self.parse_upstream_nodes(
manifest_entities, manifest_node
)
self.context.dbt_tests[key][
"results"
] = next( # pylint: disable=stop-iteration-return
item
for item in dbt_files.dbt_run_results.get("results")
if item["unique_id"] == key
)
continue
self.context.dbt_tests[key] = manifest_node
self.context.dbt_tests[key][
"upstream"
] = self.parse_upstream_nodes(manifest_entities, manifest_node)
self.context.dbt_tests[key][
"results"
] = next( # pylint: disable=stop-iteration-return
item
for item in dbt_files.dbt_run_results.get("results")
if item["unique_id"] == key
)
continue
# Skip the analysis and test nodes
if manifest_node["resource_type"] in ("analysis", "test"):
logger.info(f"Skipping DBT node: {key}.")
continue
model_name = (
manifest_node["alias"]

View File

@ -79,7 +79,7 @@ def _(config: DbtLocalConfig):
)
with open(config.dbtManifestFilePath, "r", encoding="utf-8") as manifest:
dbt_manifest = manifest.read()
if config.dbtRunResultsFilePath is not None:
if config.dbtRunResultsFilePath:
logger.debug(
f"Reading [dbtRunResultsFilePath] from: {config.dbtRunResultsFilePath}"
)
@ -87,7 +87,7 @@ def _(config: DbtLocalConfig):
config.dbtRunResultsFilePath, "r", encoding="utf-8"
) as run_results:
dbt_run_results = run_results.read()
if config.dbtCatalogFilePath is not None:
if config.dbtCatalogFilePath:
logger.debug(
f"Reading [dbtCatalogFilePath] from: {config.dbtCatalogFilePath}"
)
@ -113,7 +113,7 @@ def _(config: DbtHttpConfig):
config.dbtManifestHttpPath
)
dbt_run_results = None
if config.dbtRunResultsHttpPath is not None:
if config.dbtRunResultsHttpPath:
logger.debug(
f"Requesting [dbtRunResultsHttpPath] to: {config.dbtRunResultsHttpPath}"
)
@ -122,7 +122,7 @@ def _(config: DbtHttpConfig):
)
dbt_catalog = None
if config.dbtCatalogHttpPath is not None:
if config.dbtCatalogHttpPath:
logger.debug(
f"Requesting [dbtCatalogHttpPath] to: {config.dbtCatalogHttpPath}"
)