From 4add9b157de8a2f07e68a78e29d46f1c89012711 Mon Sep 17 00:00:00 2001 From: Harshal Sheth Date: Thu, 25 Apr 2024 13:29:51 -0700 Subject: [PATCH] feat(ingest/dbt): use columns from manifest as a fallback (#10374) --- .../datahub/ingestion/source/dbt/dbt_core.py | 38 ++++++++++++------- 1 file changed, 25 insertions(+), 13 deletions(-) diff --git a/metadata-ingestion/src/datahub/ingestion/source/dbt/dbt_core.py b/metadata-ingestion/src/datahub/ingestion/source/dbt/dbt_core.py index 581e1cc8e0..0fc35ddd28 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/dbt/dbt_core.py +++ b/metadata-ingestion/src/datahub/ingestion/source/dbt/dbt_core.py @@ -101,17 +101,31 @@ class DBTCoreConfig(DBTCommonConfig): def get_columns( - catalog_node: dict, + dbt_name: str, + catalog_node: Optional[dict], manifest_node: dict, tag_prefix: str, ) -> List[DBTColumn]: - columns = [] - - catalog_columns = catalog_node["columns"] manifest_columns = manifest_node.get("columns", {}) - manifest_columns_lower = {k.lower(): v for k, v in manifest_columns.items()} + if catalog_node is not None: + logger.debug(f"Loading schema info for {dbt_name}") + catalog_columns = catalog_node["columns"] + elif manifest_columns: + # If the end user ran `dbt compile` instead of `dbt docs generate`, then the catalog + # file will not have any column information. In this case, we will fall back to using + # information from the manifest file. + logger.debug(f"Inferring schema info for {dbt_name} from manifest") + catalog_columns = { + k: {"name": col["name"], "type": col["data_type"], "index": i} + for i, (k, col) in enumerate(manifest_columns.items()) + } + else: + logger.debug(f"Missing schema info for {dbt_name}") + return [] + + columns = [] for key, catalog_column in catalog_columns.items(): manifest_column = manifest_columns.get( key, manifest_columns_lower.get(key.lower(), {}) @@ -264,14 +278,12 @@ def extract_dbt_entities( "ephemeral", "test", ]: - logger.debug(f"Loading schema info for {dbtNode.dbt_name}") - if catalog_node is not None: - # We already have done the reporting for catalog_node being None above. - dbtNode.columns = get_columns( - catalog_node, - manifest_node, - tag_prefix, - ) + dbtNode.columns = get_columns( + dbtNode.dbt_name, + catalog_node, + manifest_node, + tag_prefix, + ) else: dbtNode.columns = []