mirror of
https://github.com/datahub-project/datahub.git
synced 2025-12-10 09:35:39 +00:00
fix(ingest): do not fail dbt ingestion when encountering missing nodes (#2833)
This commit is contained in:
parent
2d1dd95a84
commit
739e0efa61
@ -97,19 +97,20 @@ def get_columns(catalog_node: dict) -> List[DBTColumn]:
|
||||
|
||||
|
||||
def extract_dbt_entities(
|
||||
nodes: Dict[str, Dict[str, Any]],
|
||||
catalog: Dict[str, Dict[str, Any]],
|
||||
all_manifest_entities: Dict[str, Dict[str, Any]],
|
||||
all_catalog_entities: Dict[str, Dict[str, Any]],
|
||||
sources_results: List[Dict[str, Any]],
|
||||
load_catalog: bool,
|
||||
target_platform: str,
|
||||
environment: str,
|
||||
node_type_pattern: AllowDenyPattern,
|
||||
report: SourceReport,
|
||||
) -> List[DBTNode]:
|
||||
|
||||
sources_by_id = {x["unique_id"]: x for x in sources_results}
|
||||
|
||||
dbt_entities = []
|
||||
for key, node in nodes.items():
|
||||
for key, node in all_manifest_entities.items():
|
||||
dbtNode = DBTNode()
|
||||
|
||||
# check if node pattern allowed based on config file
|
||||
@ -131,23 +132,42 @@ def extract_dbt_entities(
|
||||
dbtNode.materialization = node["config"]["materialized"]
|
||||
dbtNode.upstream_urns = get_upstreams(
|
||||
node["depends_on"]["nodes"],
|
||||
nodes,
|
||||
all_manifest_entities,
|
||||
load_catalog,
|
||||
target_platform,
|
||||
environment,
|
||||
)
|
||||
else:
|
||||
# It's a source
|
||||
dbtNode.materialization = catalog[key]["metadata"][
|
||||
"type"
|
||||
] # get materialization from catalog? required?
|
||||
dbtNode.upstream_urns = []
|
||||
catalog_node = all_catalog_entities.get(key)
|
||||
|
||||
if catalog_node is None:
|
||||
report.report_warning(
|
||||
key,
|
||||
f"Entity {dbtNode.dbt_name} is in manifest but missing from catalog",
|
||||
)
|
||||
|
||||
else:
|
||||
|
||||
dbtNode.materialization = all_catalog_entities[key]["metadata"][
|
||||
"type"
|
||||
] # get materialization from catalog? required?
|
||||
dbtNode.upstream_urns = []
|
||||
|
||||
if (
|
||||
dbtNode.materialization != "ephemeral" and load_catalog
|
||||
): # we don't want columns if platform isn't 'dbt'
|
||||
logger.debug("Loading schema info")
|
||||
dbtNode.columns = get_columns(catalog[dbtNode.dbt_name])
|
||||
catalog_node = all_catalog_entities.get(dbtNode.dbt_name)
|
||||
|
||||
if catalog_node is None:
|
||||
report.report_warning(
|
||||
key,
|
||||
f"Entity {dbtNode.dbt_name} is in manifest but missing from catalog",
|
||||
)
|
||||
else:
|
||||
dbtNode.columns = get_columns(catalog_node)
|
||||
|
||||
else:
|
||||
dbtNode.columns = []
|
||||
|
||||
@ -172,6 +192,7 @@ def loadManifestAndCatalog(
|
||||
target_platform: str,
|
||||
environment: str,
|
||||
node_type_pattern: AllowDenyPattern,
|
||||
report: SourceReport,
|
||||
) -> List[DBTNode]:
|
||||
with open(manifest_path, "r") as manifest:
|
||||
dbt_manifest_json = json.load(manifest)
|
||||
@ -204,6 +225,7 @@ def loadManifestAndCatalog(
|
||||
target_platform,
|
||||
environment,
|
||||
node_type_pattern,
|
||||
report,
|
||||
)
|
||||
|
||||
return nodes
|
||||
@ -376,6 +398,7 @@ class DBTSource(Source):
|
||||
self.config.target_platform,
|
||||
self.config.env,
|
||||
self.config.node_type_pattern,
|
||||
self.report,
|
||||
)
|
||||
|
||||
for node in nodes:
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user