From 739e0efa61bd3280e533fc8ae0f02600b2c5d97f Mon Sep 17 00:00:00 2001 From: Kevin Hu <6051736+kevinhu@users.noreply.github.com> Date: Tue, 6 Jul 2021 19:32:00 -0700 Subject: [PATCH] fix(ingest): do not fail dbt ingestion when encountering missing nodes (#2833) --- .../src/datahub/ingestion/source/dbt.py | 41 +++++++++++++++---- 1 file changed, 32 insertions(+), 9 deletions(-) diff --git a/metadata-ingestion/src/datahub/ingestion/source/dbt.py b/metadata-ingestion/src/datahub/ingestion/source/dbt.py index fa9832db8a..77dd6da5d6 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/dbt.py +++ b/metadata-ingestion/src/datahub/ingestion/source/dbt.py @@ -97,19 +97,20 @@ def get_columns(catalog_node: dict) -> List[DBTColumn]: def extract_dbt_entities( - nodes: Dict[str, Dict[str, Any]], - catalog: Dict[str, Dict[str, Any]], + all_manifest_entities: Dict[str, Dict[str, Any]], + all_catalog_entities: Dict[str, Dict[str, Any]], sources_results: List[Dict[str, Any]], load_catalog: bool, target_platform: str, environment: str, node_type_pattern: AllowDenyPattern, + report: SourceReport, ) -> List[DBTNode]: sources_by_id = {x["unique_id"]: x for x in sources_results} dbt_entities = [] - for key, node in nodes.items(): + for key, node in all_manifest_entities.items(): dbtNode = DBTNode() # check if node pattern allowed based on config file @@ -131,23 +132,42 @@ def extract_dbt_entities( dbtNode.materialization = node["config"]["materialized"] dbtNode.upstream_urns = get_upstreams( node["depends_on"]["nodes"], - nodes, + all_manifest_entities, load_catalog, target_platform, environment, ) else: # It's a source - dbtNode.materialization = catalog[key]["metadata"][ - "type" - ] # get materialization from catalog? required? - dbtNode.upstream_urns = [] + catalog_node = all_catalog_entities.get(key) + + if catalog_node is None: + report.report_warning( + key, + f"Entity {dbtNode.dbt_name} is in manifest but missing from catalog", + ) + + else: + + dbtNode.materialization = all_catalog_entities[key]["metadata"][ + "type" + ] # get materialization from catalog? required? + dbtNode.upstream_urns = [] if ( dbtNode.materialization != "ephemeral" and load_catalog ): # we don't want columns if platform isn't 'dbt' logger.debug("Loading schema info") - dbtNode.columns = get_columns(catalog[dbtNode.dbt_name]) + catalog_node = all_catalog_entities.get(dbtNode.dbt_name) + + if catalog_node is None: + report.report_warning( + key, + f"Entity {dbtNode.dbt_name} is in manifest but missing from catalog", + ) + else: + dbtNode.columns = get_columns(catalog_node) + else: dbtNode.columns = [] @@ -172,6 +192,7 @@ def loadManifestAndCatalog( target_platform: str, environment: str, node_type_pattern: AllowDenyPattern, + report: SourceReport, ) -> List[DBTNode]: with open(manifest_path, "r") as manifest: dbt_manifest_json = json.load(manifest) @@ -204,6 +225,7 @@ def loadManifestAndCatalog( target_platform, environment, node_type_pattern, + report, ) return nodes @@ -376,6 +398,7 @@ class DBTSource(Source): self.config.target_platform, self.config.env, self.config.node_type_pattern, + self.report, ) for node in nodes: