Fixed dbt Manifest and Run results parsing (#18234)

This commit is contained in:
Suman Maharana 2024-10-14 13:07:22 +05:30 committed by Onkar Ravgan
parent 86507159b2
commit 63bff7aafa
2 changed files with 84 additions and 45 deletions

View File

@ -22,6 +22,61 @@ REQUIRED_MANIFEST_KEYS = ["name", "schema", "resource_type"]
# Based on https://schemas.getdbt.com/dbt/catalog/v1.json
REQUIRED_CATALOG_KEYS = ["name", "type", "index"]
REQUIRED_RESULTS_KEYS = {
"status",
"timing",
"thread_id",
"execution_time",
"message",
"adapter_response",
"unique_id",
}
REQUIRED_NODE_KEYS = {
"schema_",
"schema",
"freshness",
"name",
"resource_type",
"path",
"unique_id",
"source_name",
"source_description",
"source_meta",
"loader",
"identifier",
"relation_name",
"fqn",
"alias",
"checksum",
"config",
"column_name",
"test_metadata",
"original_file_path",
"root_path",
"database",
"tags",
"description",
"columns",
"meta",
"owner",
"created_at",
"group",
"sources",
"compiled",
"docs",
"version",
"latest_version",
"package_name",
"depends_on",
"compiled_code",
"compiled_sql",
"raw_code",
"raw_sql",
"language",
}
NONE_KEYWORDS_LIST = ["none", "null"]
DBT_CATALOG_FILE_NAME = "catalog.json"

View File

@ -13,7 +13,7 @@ DBT service Topology.
"""
from abc import ABC, abstractmethod
from typing import Iterable
from typing import Iterable, List
from dbt_artifacts_parser.parser import parse_catalog, parse_manifest, parse_run_results
from pydantic import Field
@ -37,6 +37,10 @@ from metadata.ingestion.models.topology import (
TopologyNode,
)
from metadata.ingestion.source.database.database_service import DataModelLink
from metadata.ingestion.source.database.dbt.constants import (
REQUIRED_NODE_KEYS,
REQUIRED_RESULTS_KEYS,
)
from metadata.ingestion.source.database.dbt.dbt_config import get_dbt_details
from metadata.ingestion.source.database.dbt.models import (
DbtFiles,
@ -169,51 +173,27 @@ class DbtServiceSource(TopologyRunnerMixin, Source, ABC):
}
)
required_nodes_keys = {
"schema_",
"schema",
"name",
"resource_type",
"path",
"unique_id",
"fqn",
"alias",
"checksum",
"config",
"column_name",
"test_metadata",
"original_file_path",
"root_path",
"database",
"tags",
"description",
"columns",
"meta",
"owner",
"created_at",
"group",
"sources",
"compiled",
"docs",
"version",
"latest_version",
"package_name",
"depends_on",
"compiled_code",
"compiled_sql",
"raw_code",
"raw_sql",
"language",
}
for field in ["nodes", "sources"]:
for node, value in manifest_dict.get( # pylint: disable=unused-variable
field
).items():
keys_to_delete = [
key for key in value if key.lower() not in REQUIRED_NODE_KEYS
]
for key in keys_to_delete:
del value[key]
for node, value in manifest_dict.get( # pylint: disable=unused-variable
"nodes"
).items():
keys_to_delete = [
key for key in value if key.lower() not in required_nodes_keys
]
for key in keys_to_delete:
del value[key]
def remove_run_result_non_required_keys(self, run_results: List[dict]):
"""
Method to remove the non required keys from run results file
"""
for run_result in run_results:
for result in run_result.get("results"):
keys_to_delete = [
key for key in result if key.lower() not in REQUIRED_RESULTS_KEYS
]
for key in keys_to_delete:
del result[key]
def get_dbt_files(self) -> Iterable[DbtFiles]:
dbt_files = get_dbt_details(self.source_config.dbtConfigSource)
@ -225,6 +205,10 @@ class DbtServiceSource(TopologyRunnerMixin, Source, ABC):
self.remove_manifest_non_required_keys(
manifest_dict=self.context.get().dbt_file.dbt_manifest
)
if self.context.get().dbt_file.dbt_run_results:
self.remove_run_result_non_required_keys(
run_results=self.context.get().dbt_file.dbt_run_results
)
dbt_objects = DbtObjects(
dbt_catalog=parse_catalog(self.context.get().dbt_file.dbt_catalog)
if self.context.get().dbt_file.dbt_catalog