feat(ingest/dbt): use columns from manifest as a fallback (#10374)

This commit is contained in:
Harshal Sheth 2024-04-25 13:29:51 -07:00 committed by GitHub
parent 4c40a24d76
commit 4add9b157d
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194

View File

@ -101,17 +101,31 @@ class DBTCoreConfig(DBTCommonConfig):
def get_columns( def get_columns(
catalog_node: dict, dbt_name: str,
catalog_node: Optional[dict],
manifest_node: dict, manifest_node: dict,
tag_prefix: str, tag_prefix: str,
) -> List[DBTColumn]: ) -> List[DBTColumn]:
columns = []
catalog_columns = catalog_node["columns"]
manifest_columns = manifest_node.get("columns", {}) manifest_columns = manifest_node.get("columns", {})
manifest_columns_lower = {k.lower(): v for k, v in manifest_columns.items()} manifest_columns_lower = {k.lower(): v for k, v in manifest_columns.items()}
if catalog_node is not None:
logger.debug(f"Loading schema info for {dbt_name}")
catalog_columns = catalog_node["columns"]
elif manifest_columns:
# If the end user ran `dbt compile` instead of `dbt docs generate`, then the catalog
# file will not have any column information. In this case, we will fall back to using
# information from the manifest file.
logger.debug(f"Inferring schema info for {dbt_name} from manifest")
catalog_columns = {
k: {"name": col["name"], "type": col["data_type"], "index": i}
for i, (k, col) in enumerate(manifest_columns.items())
}
else:
logger.debug(f"Missing schema info for {dbt_name}")
return []
columns = []
for key, catalog_column in catalog_columns.items(): for key, catalog_column in catalog_columns.items():
manifest_column = manifest_columns.get( manifest_column = manifest_columns.get(
key, manifest_columns_lower.get(key.lower(), {}) key, manifest_columns_lower.get(key.lower(), {})
@ -264,14 +278,12 @@ def extract_dbt_entities(
"ephemeral", "ephemeral",
"test", "test",
]: ]:
logger.debug(f"Loading schema info for {dbtNode.dbt_name}") dbtNode.columns = get_columns(
if catalog_node is not None: dbtNode.dbt_name,
# We already have done the reporting for catalog_node being None above. catalog_node,
dbtNode.columns = get_columns( manifest_node,
catalog_node, tag_prefix,
manifest_node, )
tag_prefix,
)
else: else:
dbtNode.columns = [] dbtNode.columns = []