From 6aa133f99cd9d1a5b65ac203c639ba917e330472 Mon Sep 17 00:00:00 2001 From: Remi Date: Tue, 25 May 2021 19:59:35 -0600 Subject: [PATCH] fix(ingest): fix lineage after dbt metadata ingestion when tables name and identifier differ (#2596) --- metadata-ingestion/README.md | 18 ++++++----- .../src/datahub/ingestion/source/dbt.py | 31 ++++++++++++++----- 2 files changed, 33 insertions(+), 16 deletions(-) diff --git a/metadata-ingestion/README.md b/metadata-ingestion/README.md index 6fdbc44f43..90808ebf9b 100644 --- a/metadata-ingestion/README.md +++ b/metadata-ingestion/README.md @@ -53,7 +53,7 @@ We use a plugin architecture so that you can install only the dependencies you a | lookml | `pip install 'acryl-datahub[lookml]'` | LookML source, requires Python 3.7+ | | kafka | `pip install 'acryl-datahub[kafka]'` | Kafka source | | druid | `pip install 'acryl-datahub[druid]'` | Druid Source | -| dbt | _no additional dependencies_ | DBT source | +| dbt | _no additional dependencies_ | dbt source | | datahub-rest | `pip install 'acryl-datahub[datahub-rest]'` | DataHub sink over REST API | | datahub-kafka | `pip install 'acryl-datahub[datahub-kafka]'` | DataHub sink over Kafka | @@ -595,19 +595,19 @@ source: filename: ./path/to/mce/file.json ``` -### DBT `dbt` +### dbt `dbt` -Pull metadata from DBT output files: +Pull metadata from dbt artifacts files: - [dbt manifest file](https://docs.getdbt.com/reference/artifacts/manifest-json) - This file contains model, source and lineage data. - [dbt catalog file](https://docs.getdbt.com/reference/artifacts/catalog-json) - This file contains schema data. - - DBT does not record schema data for Ephemeral models, as such datahub will show Ephemeral models in the lineage, however there will be no associated schema for Ephemeral models + - dbt does not record schema data for Ephemeral models, as such datahub will show Ephemeral models in the lineage, however there will be no associated schema for Ephemeral models - target_platform: - - The data platform you are enriching with DBT metadata. + - The data platform you are enriching with dbt metadata. - [data platforms](https://github.com/linkedin/datahub/blob/master/gms/impl/src/main/resources/DataPlatformInfo.json) -- load_schema: +- load_schemas: - Load schemas from dbt catalog file, not necessary when the underlying data platform already has this data. ```yml @@ -616,10 +616,12 @@ source: config: manifest_path: "./path/dbt/manifest_file.json" catalog_path: "./path/dbt/catalog_file.json" - target_platform: "postgres" # optional eg postgres, snowflake etc. - load_schema: True / False + target_platform: "postgres" # optional, eg "postgres", "snowflake", etc. + load_schemas: True or False ``` +Note: when `load_schemas` is False, models that use [identifiers](https://docs.getdbt.com/reference/resource-properties/identifier) to reference their source tables are ingested using the model identifier as the model name to preserve the lineage. + ### Kafka Connect `kafka-connect` Extracts: diff --git a/metadata-ingestion/src/datahub/ingestion/source/dbt.py b/metadata-ingestion/src/datahub/ingestion/source/dbt.py index 48d0d7de6a..a7a78c9295 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/dbt.py +++ b/metadata-ingestion/src/datahub/ingestion/source/dbt.py @@ -58,7 +58,7 @@ class DBTNode: dbt_file_path: str node_type: str # source, model materialization: str # table, view, ephemeral - name: str + name: str # name, identifier columns: List[DBTColumn] upstream_urns: List[str] datahub_urn: str @@ -98,17 +98,24 @@ def extract_dbt_entities( dbtNode = DBTNode() dbtNode.dbt_name = key - dbtNode.node_type = node["resource_type"] - dbtNode.name = node["name"] dbtNode.database = node["database"] dbtNode.schema = node["schema"] dbtNode.dbt_file_path = node["original_file_path"] + dbtNode.node_type = node["resource_type"] + if "identifier" in node and load_catalog is False: + dbtNode.name = node["identifier"] + else: + dbtNode.name = node["name"] if "materialized" in node["config"].keys(): # It's a model dbtNode.materialization = node["config"]["materialized"] dbtNode.upstream_urns = get_upstreams( - node["depends_on"]["nodes"], nodes, target_platform, environment + node["depends_on"]["nodes"], + nodes, + load_catalog, + target_platform, + environment, ) else: # It's a source @@ -190,19 +197,27 @@ def get_custom_properties(node: DBTNode) -> Dict[str, str]: def get_upstreams( upstreams: List[str], all_nodes: Dict[str, dict], + load_catalog: bool, target_platform: str, environment: str, ) -> List[str]: upstream_urns = [] for upstream in upstreams: - upstream_node = all_nodes[upstream] + dbtNode_upstream = DBTNode() + + dbtNode_upstream.database = all_nodes[upstream]["database"] + dbtNode_upstream.schema = all_nodes[upstream]["schema"] + if "identifier" in all_nodes[upstream] and load_catalog is False: + dbtNode_upstream.name = all_nodes[upstream]["identifier"] + else: + dbtNode_upstream.name = all_nodes[upstream]["name"] upstream_urns.append( get_urn_from_dbtNode( - upstream_node["database"], - upstream_node["schema"], - upstream_node["name"], + dbtNode_upstream.database, + dbtNode_upstream.schema, + dbtNode_upstream.name, target_platform, environment, )