fix(ingest): fix lineage after dbt metadata ingestion when tables name and identifier differ (#2596)

This commit is contained in:
Remi 2021-05-25 19:59:35 -06:00 committed by GitHub
parent d23c58734b
commit 6aa133f99c
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 33 additions and 16 deletions

View File

@ -53,7 +53,7 @@ We use a plugin architecture so that you can install only the dependencies you a
| lookml | `pip install 'acryl-datahub[lookml]'` | LookML source, requires Python 3.7+ |
| kafka | `pip install 'acryl-datahub[kafka]'` | Kafka source |
| druid | `pip install 'acryl-datahub[druid]'` | Druid Source |
| dbt | _no additional dependencies_ | DBT source |
| dbt | _no additional dependencies_ | dbt source |
| datahub-rest | `pip install 'acryl-datahub[datahub-rest]'` | DataHub sink over REST API |
| datahub-kafka | `pip install 'acryl-datahub[datahub-kafka]'` | DataHub sink over Kafka |
@ -595,19 +595,19 @@ source:
filename: ./path/to/mce/file.json
```
### DBT `dbt`
### dbt `dbt`
Pull metadata from DBT output files:
Pull metadata from dbt artifacts files:
- [dbt manifest file](https://docs.getdbt.com/reference/artifacts/manifest-json)
- This file contains model, source and lineage data.
- [dbt catalog file](https://docs.getdbt.com/reference/artifacts/catalog-json)
- This file contains schema data.
- DBT does not record schema data for Ephemeral models, as such datahub will show Ephemeral models in the lineage, however there will be no associated schema for Ephemeral models
- dbt does not record schema data for Ephemeral models, as such datahub will show Ephemeral models in the lineage, however there will be no associated schema for Ephemeral models
- target_platform:
- The data platform you are enriching with DBT metadata.
- The data platform you are enriching with dbt metadata.
- [data platforms](https://github.com/linkedin/datahub/blob/master/gms/impl/src/main/resources/DataPlatformInfo.json)
- load_schema:
- load_schemas:
- Load schemas from dbt catalog file, not necessary when the underlying data platform already has this data.
```yml
@ -616,10 +616,12 @@ source:
config:
manifest_path: "./path/dbt/manifest_file.json"
catalog_path: "./path/dbt/catalog_file.json"
target_platform: "postgres" # optional eg postgres, snowflake etc.
load_schema: True / False
target_platform: "postgres" # optional, eg "postgres", "snowflake", etc.
load_schemas: True or False
```
Note: when `load_schemas` is False, models that use [identifiers](https://docs.getdbt.com/reference/resource-properties/identifier) to reference their source tables are ingested using the model identifier as the model name to preserve the lineage.
### Kafka Connect `kafka-connect`
Extracts:

View File

@ -58,7 +58,7 @@ class DBTNode:
dbt_file_path: str
node_type: str # source, model
materialization: str # table, view, ephemeral
name: str
name: str # name, identifier
columns: List[DBTColumn]
upstream_urns: List[str]
datahub_urn: str
@ -98,17 +98,24 @@ def extract_dbt_entities(
dbtNode = DBTNode()
dbtNode.dbt_name = key
dbtNode.node_type = node["resource_type"]
dbtNode.name = node["name"]
dbtNode.database = node["database"]
dbtNode.schema = node["schema"]
dbtNode.dbt_file_path = node["original_file_path"]
dbtNode.node_type = node["resource_type"]
if "identifier" in node and load_catalog is False:
dbtNode.name = node["identifier"]
else:
dbtNode.name = node["name"]
if "materialized" in node["config"].keys():
# It's a model
dbtNode.materialization = node["config"]["materialized"]
dbtNode.upstream_urns = get_upstreams(
node["depends_on"]["nodes"], nodes, target_platform, environment
node["depends_on"]["nodes"],
nodes,
load_catalog,
target_platform,
environment,
)
else:
# It's a source
@ -190,19 +197,27 @@ def get_custom_properties(node: DBTNode) -> Dict[str, str]:
def get_upstreams(
upstreams: List[str],
all_nodes: Dict[str, dict],
load_catalog: bool,
target_platform: str,
environment: str,
) -> List[str]:
upstream_urns = []
for upstream in upstreams:
upstream_node = all_nodes[upstream]
dbtNode_upstream = DBTNode()
dbtNode_upstream.database = all_nodes[upstream]["database"]
dbtNode_upstream.schema = all_nodes[upstream]["schema"]
if "identifier" in all_nodes[upstream] and load_catalog is False:
dbtNode_upstream.name = all_nodes[upstream]["identifier"]
else:
dbtNode_upstream.name = all_nodes[upstream]["name"]
upstream_urns.append(
get_urn_from_dbtNode(
upstream_node["database"],
upstream_node["schema"],
upstream_node["name"],
dbtNode_upstream.database,
dbtNode_upstream.schema,
dbtNode_upstream.name,
target_platform,
environment,
)