mirror of
https://github.com/open-metadata/OpenMetadata.git
synced 2025-12-24 05:58:31 +00:00
* Added dbt glossary and tier ingestion * changed elif to if condition:pylint * Added dbtMeta model for glossary and tier * removed source * Optimization: Reused the dtb_tags * pylint format * py_format * Removed unnecessary models and minor changes * Removed empty string assignment
This commit is contained in:
parent
47f0d99333
commit
6915c1a1b6
@ -21,6 +21,7 @@ from metadata.generated.schema.api.tests.createTestDefinition import (
|
||||
CreateTestDefinitionRequest,
|
||||
)
|
||||
from metadata.generated.schema.entity.classification.tag import Tag
|
||||
from metadata.generated.schema.entity.data.glossaryTerm import GlossaryTerm
|
||||
from metadata.generated.schema.entity.data.table import (
|
||||
Column,
|
||||
DataModel,
|
||||
@ -83,6 +84,7 @@ from metadata.ingestion.source.database.dbt.dbt_utils import (
|
||||
get_dbt_model_name,
|
||||
get_dbt_raw_query,
|
||||
)
|
||||
from metadata.ingestion.source.database.dbt.models import DbtMeta
|
||||
from metadata.utils import fqn
|
||||
from metadata.utils.elasticsearch import get_entity_from_es_result
|
||||
from metadata.utils.logger import ingestion_logger
|
||||
@ -389,7 +391,7 @@ class DbtSource(DbtServiceSource):
|
||||
if dbt_objects.dbt_catalog:
|
||||
catalog_node = catalog_entities.get(key)
|
||||
|
||||
dbt_table_tags_list = None
|
||||
dbt_table_tags_list = []
|
||||
if manifest_node.tags:
|
||||
dbt_table_tags_list = get_tag_labels(
|
||||
metadata=self.metadata,
|
||||
@ -398,6 +400,11 @@ class DbtSource(DbtServiceSource):
|
||||
include_tags=self.source_config.includeTags,
|
||||
)
|
||||
|
||||
if manifest_node.meta:
|
||||
dbt_table_tags_list.extend(
|
||||
self.process_dbt_meta(manifest_node.meta) or []
|
||||
)
|
||||
|
||||
dbt_compiled_query = get_dbt_compiled_query(manifest_node)
|
||||
dbt_raw_query = get_dbt_raw_query(manifest_node)
|
||||
|
||||
@ -445,7 +452,7 @@ class DbtSource(DbtServiceSource):
|
||||
manifest_node=manifest_node,
|
||||
catalog_node=catalog_node,
|
||||
),
|
||||
tags=dbt_table_tags_list,
|
||||
tags=dbt_table_tags_list or None,
|
||||
),
|
||||
)
|
||||
yield Either(right=data_model_link)
|
||||
@ -548,6 +555,34 @@ class DbtSource(DbtServiceSource):
|
||||
if catalog_column and catalog_column.comment:
|
||||
column_description = catalog_column.comment
|
||||
|
||||
dbt_column_tag_list = []
|
||||
dbt_column_tag_list.extend(
|
||||
get_tag_labels(
|
||||
metadata=self.metadata,
|
||||
tags=manifest_column.tags,
|
||||
classification_name=self.tag_classification_name,
|
||||
include_tags=self.source_config.includeTags,
|
||||
)
|
||||
or []
|
||||
)
|
||||
|
||||
if manifest_column.meta:
|
||||
dbt_column_meta = DbtMeta(**manifest_column.meta)
|
||||
logger.debug(f"Processing DBT column glossary: {key}")
|
||||
if (
|
||||
dbt_column_meta.openmetadata
|
||||
and dbt_column_meta.openmetadata.glossary
|
||||
):
|
||||
dbt_column_tag_list.extend(
|
||||
get_tag_labels(
|
||||
metadata=self.metadata,
|
||||
tags=dbt_column_meta.openmetadata.glossary,
|
||||
include_tags=self.source_config.includeTags,
|
||||
tag_type=GlossaryTerm,
|
||||
)
|
||||
or []
|
||||
)
|
||||
|
||||
columns.append(
|
||||
Column(
|
||||
name=column_name,
|
||||
@ -563,12 +598,7 @@ class DbtSource(DbtServiceSource):
|
||||
ordinalPosition=catalog_column.index
|
||||
if catalog_column
|
||||
else None,
|
||||
tags=get_tag_labels(
|
||||
metadata=self.metadata,
|
||||
tags=manifest_column.tags,
|
||||
classification_name=self.tag_classification_name,
|
||||
include_tags=self.source_config.includeTags,
|
||||
),
|
||||
tags=dbt_column_tag_list or None,
|
||||
)
|
||||
)
|
||||
logger.debug(f"Successfully processed DBT column: {key}")
|
||||
@ -674,6 +704,42 @@ class DbtSource(DbtServiceSource):
|
||||
)
|
||||
)
|
||||
|
||||
def process_dbt_meta(self, manifest_meta):
|
||||
"""
|
||||
Method to process DBT meta for Tags and GlossaryTerms
|
||||
"""
|
||||
dbt_table_tags_list = []
|
||||
try:
|
||||
dbt_meta_info = DbtMeta(**manifest_meta)
|
||||
if dbt_meta_info.openmetadata and dbt_meta_info.openmetadata.glossary:
|
||||
dbt_table_tags_list.extend(
|
||||
get_tag_labels(
|
||||
metadata=self.metadata,
|
||||
tags=dbt_meta_info.openmetadata.glossary,
|
||||
include_tags=self.source_config.includeTags,
|
||||
tag_type=GlossaryTerm,
|
||||
)
|
||||
or []
|
||||
)
|
||||
|
||||
if dbt_meta_info.openmetadata and dbt_meta_info.openmetadata.tier:
|
||||
tier_fqn = dbt_meta_info.openmetadata.tier
|
||||
dbt_table_tags_list.extend(
|
||||
get_tag_labels(
|
||||
metadata=self.metadata,
|
||||
tags=[tier_fqn.split(fqn.FQN_SEPARATOR)[-1]],
|
||||
classification_name=tier_fqn.split(fqn.FQN_SEPARATOR)[0],
|
||||
include_tags=self.source_config.includeTags,
|
||||
)
|
||||
or []
|
||||
)
|
||||
|
||||
except Exception as exc: # pylint: disable=broad-except
|
||||
logger.debug(traceback.format_exc())
|
||||
logger.warning(f"Failed to process meta dbt Tags and GlossaryTerms: {exc}")
|
||||
|
||||
return dbt_table_tags_list or []
|
||||
|
||||
def process_dbt_descriptions(self, data_model_link: DataModelLink):
|
||||
"""
|
||||
Method to process DBT descriptions using patch APIs
|
||||
|
||||
@ -12,7 +12,7 @@
|
||||
Models required for dbt
|
||||
"""
|
||||
|
||||
from typing import Any, Optional
|
||||
from typing import Any, List, Optional
|
||||
|
||||
from pydantic import BaseModel
|
||||
|
||||
@ -33,3 +33,12 @@ class DbtFilteredModel(BaseModel):
|
||||
is_filtered: Optional[bool] = False
|
||||
message: Optional[str]
|
||||
model_fqn: Optional[str]
|
||||
|
||||
|
||||
class DbtMetaGlossaryTier(BaseModel):
|
||||
tier: Optional[str]
|
||||
glossary: Optional[List[str]]
|
||||
|
||||
|
||||
class DbtMeta(BaseModel):
|
||||
openmetadata: Optional[DbtMetaGlossaryTier]
|
||||
|
||||
@ -14,13 +14,14 @@ Tag utils Module
|
||||
|
||||
import functools
|
||||
import traceback
|
||||
from typing import Iterable, List, Optional
|
||||
from typing import Iterable, List, Optional, Union
|
||||
|
||||
from metadata.generated.schema.api.classification.createClassification import (
|
||||
CreateClassificationRequest,
|
||||
)
|
||||
from metadata.generated.schema.api.classification.createTag import CreateTagRequest
|
||||
from metadata.generated.schema.entity.classification.tag import Tag
|
||||
from metadata.generated.schema.entity.data.glossaryTerm import GlossaryTerm
|
||||
from metadata.generated.schema.entity.services.ingestionPipelines.status import (
|
||||
StackTraceError,
|
||||
)
|
||||
@ -82,29 +83,41 @@ def get_ometa_tag_and_classification(
|
||||
|
||||
@functools.lru_cache(maxsize=512)
|
||||
def get_tag_label(
|
||||
metadata: OpenMetadata, tag_name: str, classification_name: str
|
||||
metadata: OpenMetadata,
|
||||
tag_name: str,
|
||||
classification_name: str,
|
||||
tag_type: Union[Tag, GlossaryTerm] = Tag,
|
||||
) -> Optional[TagLabel]:
|
||||
"""
|
||||
Returns the tag label if the tag is created
|
||||
"""
|
||||
try:
|
||||
# Build the tag FQN
|
||||
tag_fqn = fqn.build(
|
||||
metadata,
|
||||
Tag,
|
||||
classification_name=classification_name,
|
||||
tag_name=tag_name,
|
||||
)
|
||||
if tag_type == Tag:
|
||||
# Build the tag FQN
|
||||
tag_fqn = fqn.build(
|
||||
metadata,
|
||||
tag_type,
|
||||
classification_name=classification_name,
|
||||
tag_name=tag_name,
|
||||
)
|
||||
source = TagSource.Classification.value
|
||||
|
||||
if tag_type == GlossaryTerm:
|
||||
tag_fqn = tag_name
|
||||
source = TagSource.Glossary.value
|
||||
|
||||
# Check if the tag exists
|
||||
tag = metadata.get_by_name(entity=Tag, fqn=tag_fqn)
|
||||
tag = metadata.get_by_name(entity=tag_type, fqn=tag_fqn)
|
||||
if tag:
|
||||
return TagLabel(
|
||||
tagFQN=tag_fqn,
|
||||
labelType=LabelType.Automated.value,
|
||||
state=State.Suggested.value,
|
||||
source=TagSource.Classification.value,
|
||||
source=source,
|
||||
)
|
||||
|
||||
logger.warning(f"Tag does not exist: {tag_fqn}")
|
||||
|
||||
except Exception as err:
|
||||
logger.debug(traceback.format_exc())
|
||||
logger.error(f"Error processing tag label: {err}")
|
||||
@ -114,8 +127,9 @@ def get_tag_label(
|
||||
def get_tag_labels(
|
||||
metadata: OpenMetadata,
|
||||
tags: List[str],
|
||||
classification_name: str,
|
||||
classification_name: Optional[str],
|
||||
include_tags: bool = True,
|
||||
tag_type: Union[Tag, GlossaryTerm] = Tag,
|
||||
) -> Optional[List[TagLabel]]:
|
||||
"""
|
||||
Method to create tag labels from the collected tags
|
||||
@ -125,7 +139,10 @@ def get_tag_labels(
|
||||
for tag in tags:
|
||||
try:
|
||||
tag_label = get_tag_label(
|
||||
metadata, tag_name=tag, classification_name=classification_name
|
||||
metadata,
|
||||
tag_name=tag,
|
||||
classification_name=classification_name,
|
||||
tag_type=tag_type,
|
||||
)
|
||||
if tag_label:
|
||||
tag_labels_list.append(tag_label)
|
||||
|
||||
@ -0,0 +1,185 @@
|
||||
---
|
||||
title: Ingest Glossary from dbt
|
||||
slug: /connectors/ingestion/workflows/dbt/ingest-dbt-glossary
|
||||
---
|
||||
|
||||
# Ingest Glossary from dbt
|
||||
|
||||
Ingest the table and column level glossary terms from `manifest.json` file
|
||||
|
||||
## Requirements
|
||||
|
||||
{% note %}
|
||||
|
||||
For dbt Glossary, Glossary terms must be created or present in OpenMetadata beforehand for data ingestion to work.
|
||||
|
||||
{% /note %}
|
||||
|
||||
## Steps for ingesting dbt Glossary
|
||||
|
||||
### 1. Create a Glosary at OpenMetadata or Select a previously added glossary
|
||||
A Glossary Term is a preferred terminology for a concept. In a Glossary term, you can add tags, synonyms, related terms to build a conceptual semantic graph, and also add reference links.
|
||||
|
||||
For details on creating glossary terms, refer to the [OpenMetadata documentation](https://docs.open-metadata.org/v1.3.x/how-to-guides/data-governance/glossary/create-terms)
|
||||
|
||||
To view created Glossary Terms, navigate to the Glossary section within OpenMetadata `govern->glossary->glossary_name->glossary_term_name`
|
||||
|
||||
OpenMetadata also supports creating nested Glossary Terms, allowing you to organize them hierarchically and seamlessly ingest them into dbt.
|
||||
|
||||
{% image
|
||||
src="/images/v1.3//features/ingestion/workflows/dbt/dbt-features/dbt-glossary-term.webp"
|
||||
alt="Openmetadata_glossary_term"
|
||||
caption="OpenMetadata Glossaries"
|
||||
/%}
|
||||
|
||||
|
||||
### 2. Add Table-Level Glossary term information in schema.yml file
|
||||
|
||||
To associate glossary terms with specific tables in your dbt model, you'll need their Fully Qualified Names (FQNs) within OpenMetadata.
|
||||
|
||||
#### Steps to Get Glossary Term FQNs:
|
||||
1. Navigate to the desired glossary term in OpenMetadata's glossary section.
|
||||
2. The glossary term's details page will display its FQN e.g. `Glossary_name.glossary_term` in the url like `your-uri/glossary/Glossary_name.glossary_term`.
|
||||
|
||||
#### Example
|
||||
Suppose you want to add the glossary terms `term_one` (FQN: `Test_Glossary.term_one`) and `more_nested_term` (FQN: `Test_Glossary.term_two.nested_term.more_nested_term`) to the customers table in your dbt model.
|
||||
|
||||
To get FQN for `term_one` (`Test_Glossary.term_one`), navigate to `govern->glossary->Test_Glossary->term_one`.
|
||||
|
||||
And for `more_nested_term` (`Test_Glossary.term_two.nested_term.more_nested_term`), navigate to `govern->glossary->Test_Glossary->term_two->nested_term->more_nested_term`.
|
||||
|
||||
you can see the current url containing the glossary term FQNs as **https://localhost:8585/glossary/`Test_Glossary.term_two.nested_term.more_nested_term`**
|
||||
|
||||
{% image
|
||||
src="/images/v1.3//features/ingestion/workflows/dbt/dbt-features/dbt-glossary-fqn.webp"
|
||||
alt="Openmetadata_glossary_term"
|
||||
caption="OpenMetadata Glossary Term - term_one"
|
||||
/%}
|
||||
|
||||
{% image
|
||||
src="/images/v1.3//features/ingestion/workflows/dbt/dbt-features/dbt-glossary-nested-fqn.webp"
|
||||
alt="Openmetadata_glossary_term"
|
||||
caption="OpenMetadata Glossary Term - more_nested_term"
|
||||
/%}
|
||||
|
||||
In your dbt schema.yml file for the `customers` table model, add the Glossary Term FQNs under `model->name->meta->openmetadata->glossary`
|
||||
The format should be a list of strings, like this: ` [ 'Test_Glossary.term_one', 'Test_Glossary.term_two.nested_term.more_nested_term' ]`.
|
||||
|
||||
For details on dbt meta follow the link [here](https://docs.getdbt.com/reference/resource-configs/meta)
|
||||
|
||||
```yml
|
||||
models:
|
||||
- name: customers
|
||||
meta:
|
||||
openmetadata:
|
||||
glossary: [
|
||||
'Test_Glossary.term_one',
|
||||
'Test_Glossary.term_two.nested_term.more_nested_term',
|
||||
]
|
||||
description: This table has basic information about a customer, as well as some derived facts based on a customer's orders
|
||||
|
||||
columns:
|
||||
- name: customer_id
|
||||
description: This is a unique identifier for a customer
|
||||
tests:
|
||||
- unique
|
||||
- not_null
|
||||
```
|
||||
|
||||
After adding the Glossary term information to your schema.yml file, run your dbt workflow.
|
||||
The generated `manifest.json` file will then include the FQNs under `node_name->meta->openmetadata->glossary` as `[ 'Test_Glossary.term_one', 'Test_Glossary.term_two.nested_term.more_nested_term' ]`
|
||||
|
||||
```json
|
||||
"model.jaffle_shop.customers": {
|
||||
"raw_sql": "sample_raw_sql",
|
||||
"compiled": true,
|
||||
"resource_type": "model",
|
||||
"depends_on": {},
|
||||
"database": "dev",
|
||||
"schema": "dbt_jaffle",
|
||||
"config": {
|
||||
"enabled": true,
|
||||
"alias": null,
|
||||
"meta": {
|
||||
"openmetadata": {
|
||||
"glossary": [
|
||||
"Test_Glossary.term_one",
|
||||
"Test_Glossary.term_two.nested_term.more_nested_term"
|
||||
]
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
```
|
||||
|
||||
### 3. Add Column-Level Glossary term information in `schema.yml` file
|
||||
|
||||
To associate a glossary term with a specific column in your dbt model, follow these steps:
|
||||
|
||||
1. Locate the `customer_id` column within the `customers` table model in your `schema.yml` file.
|
||||
2. Under the `customer_id` column definition, add the glossary term FQNs under `model->name->columns->column_name->meta->openmetadata->glossary` as ` [ 'Test_Glossary.term_two.nested_term' ]`.
|
||||
|
||||
```yml
|
||||
models:
|
||||
- name: customers
|
||||
meta:
|
||||
openmetadata:
|
||||
glossary: [
|
||||
'Test_Glossary.term_one',
|
||||
'Test_Glossary.term_two.nested_term.more_nested_term',
|
||||
]
|
||||
description: This table has basic information about a customer, as well as some derived facts based on a customer's orders
|
||||
|
||||
columns:
|
||||
- name: customer_id
|
||||
description: This is a unique identifier for a customer
|
||||
meta:
|
||||
openmetadata:
|
||||
glossary: [
|
||||
'Test_Glossary.term_two.nested_term'
|
||||
]
|
||||
tests:
|
||||
- unique
|
||||
- not_null
|
||||
```
|
||||
|
||||
|
||||
After adding the Glossary term information to your schema.yml file, run your dbt workflow.
|
||||
The generated `manifest.json` file will then include the FQNs under `node_name->columns->column_name->meta->openmetadata->glossary` as `[ 'Test_Glossary.term_two.nested_term' ]`
|
||||
|
||||
```json
|
||||
"model.jaffle_shop.customers": {
|
||||
"raw_sql": "sample_raw_sql",
|
||||
"compiled": true,
|
||||
"resource_type": "model",
|
||||
"depends_on": {},
|
||||
"database": "dev",
|
||||
"schema": "dbt_jaffle",
|
||||
"columns": {
|
||||
"customer_id": {
|
||||
"name": "customer_id",
|
||||
"description": "This is a unique identifier for a customer",
|
||||
"meta": {
|
||||
"openmetadata": {
|
||||
"glossary": [
|
||||
"Test_Glossary.term_two.nested_term"
|
||||
]
|
||||
}
|
||||
},
|
||||
"data_type": null,
|
||||
"constraints": [],
|
||||
"quote": null,
|
||||
"tags": []
|
||||
},
|
||||
}
|
||||
}
|
||||
```
|
||||
|
||||
### 4. Viewing the Glossary term on tables and columns
|
||||
Table and Column level Glossary term ingested from dbt can be viewed on the node in OpenMetadata
|
||||
|
||||
{% image
|
||||
src="/images/v1.3//features/ingestion/workflows/dbt/dbt-features/dbt-glossary.webp"
|
||||
alt="dbt_glossary"
|
||||
caption="dbt Glossary term"
|
||||
/%}
|
||||
@ -0,0 +1,80 @@
|
||||
---
|
||||
title: Ingest Tiers from dbt
|
||||
slug: /connectors/ingestion/workflows/dbt/ingest-dbt-tier
|
||||
---
|
||||
|
||||
# Ingest Tiers from dbt
|
||||
|
||||
Ingest the table-level tier from `manifest.json` file
|
||||
|
||||
## Requirements
|
||||
|
||||
{% note %}
|
||||
|
||||
For dbt Tier, Tiers must be created or present in OpenMetadata beforehand for data ingestion to work.
|
||||
|
||||
{% /note %}
|
||||
|
||||
## Steps for ingesting dbt Tier
|
||||
|
||||
### 1. Add a Tier at OpenMetadata or Select a Tier
|
||||
Tiering is an important concept of data classification in OpenMetadata. Tiers should be based on the importance of data. Using Tiers, data producers or owners can define the importance of data to an organization.
|
||||
|
||||
For details on adding or selecting tiers, refer to the [OpenMetadata documentation](https://docs.open-metadata.org/v1.3.x/how-to-guides/data-governance/classification/tiers#what-are-tiers)
|
||||
|
||||
|
||||
### 2. Add Table-Level Tier information in schema.yml file
|
||||
Suppose you want to add the Tier `Tier2` to a table model `customers`.
|
||||
|
||||
Go to your schema.yml file at dbt containing the table model information `customers` and add the tier FQN under `model->name->meta->openmetadata->tier` as `Tier.Tier2`.
|
||||
|
||||
For more details on dbt meta field follow the link [here](https://docs.getdbt.com/reference/resource-configs/meta)
|
||||
|
||||
```yml
|
||||
models:
|
||||
- name: customers
|
||||
meta:
|
||||
openmetadata:
|
||||
tier: 'Tier.Tier2'
|
||||
|
||||
description: This table has basic information about a customer, as well as some derived facts based on a customer's orders
|
||||
|
||||
columns:
|
||||
- name: customer_id
|
||||
description: This is a unique identifier for a customer
|
||||
tests:
|
||||
- unique
|
||||
- not_null
|
||||
```
|
||||
|
||||
|
||||
After adding the tier information to your `schema.yml` file, run your dbt workflow. The generated `manifest.json` file will then reflect the tier assignment. You'll find it under `node_name->config->meta->openmetadata->tier` as `Tier.Tier2`.
|
||||
|
||||
```json
|
||||
"model.jaffle_shop.customers": {
|
||||
"raw_sql": "sample_raw_sql",
|
||||
"compiled": true,
|
||||
"resource_type": "model",
|
||||
"depends_on": {},
|
||||
"database": "dev",
|
||||
"schema": "dbt_jaffle",
|
||||
"config": {
|
||||
"enabled": true,
|
||||
"alias": null,
|
||||
"meta": {
|
||||
"openmetadata": {
|
||||
"tier": "Tier.Tier2"
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
```
|
||||
|
||||
### 3. Viewing the Tier on tables
|
||||
Table level Tier ingested from dbt can be viewed on the node in OpenMetadata
|
||||
|
||||
{% image
|
||||
src="/images/v1.3//features/ingestion/workflows/dbt/dbt-features/dbt-tier.webp"
|
||||
alt="dbt_tier"
|
||||
caption="dbt Tier"
|
||||
/%}
|
||||
@ -840,6 +840,10 @@ site_menu:
|
||||
url: /connectors/ingestion/workflows/dbt/ingest-dbt-descriptions
|
||||
- category: Connectors / Ingestion / Workflows / dbt / Ingest dbt Tags
|
||||
url: /connectors/ingestion/workflows/dbt/ingest-dbt-tags
|
||||
- category: Connectors / Ingestion / Workflows / dbt / Ingest dbt Tiers
|
||||
url: /connectors/ingestion/workflows/dbt/ingest-dbt-tier
|
||||
- category: Connectors / Ingestion / Workflows / dbt / Ingest dbt Glossary
|
||||
url: /connectors/ingestion/workflows/dbt/ingest-dbt-glossary
|
||||
- category: Connectors / Ingestion / Workflows / dbt / Ingest dbt Lineage
|
||||
url: /connectors/ingestion/workflows/dbt/ingest-dbt-lineage
|
||||
- category: Connectors / Ingestion / Workflows / dbt / Setup Multiple dbt Projects
|
||||
|
||||
Binary file not shown.
|
After Width: | Height: | Size: 35 KiB |
Binary file not shown.
|
After Width: | Height: | Size: 41 KiB |
Binary file not shown.
|
After Width: | Height: | Size: 63 KiB |
Binary file not shown.
|
After Width: | Height: | Size: 87 KiB |
Binary file not shown.
|
After Width: | Height: | Size: 83 KiB |
Loading…
x
Reference in New Issue
Block a user