Ingest DBT tags (#7726)

* added tags

* Added dbt tags ingestion

Co-authored-by: Onkar Ravgan <onkarravgan@Onkars-MacBook-Pro.local>
This commit is contained in:
Onkar Ravgan 2022-09-27 11:12:53 +05:30 committed by GitHub
parent ada163ab2d
commit 8afbbb8642
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
5 changed files with 104 additions and 1 deletions

View File

@ -27,6 +27,10 @@ from metadata.generated.schema.api.lineage.addLineage import AddLineageRequest
from metadata.generated.schema.api.services.createStorageService import ( from metadata.generated.schema.api.services.createStorageService import (
CreateStorageServiceRequest, CreateStorageServiceRequest,
) )
from metadata.generated.schema.api.tags.createTag import CreateTagRequest
from metadata.generated.schema.api.tags.createTagCategory import (
CreateTagCategoryRequest,
)
from metadata.generated.schema.entity.data.database import Database from metadata.generated.schema.entity.data.database import Database
from metadata.generated.schema.entity.data.databaseSchema import DatabaseSchema from metadata.generated.schema.entity.data.databaseSchema import DatabaseSchema
from metadata.generated.schema.entity.data.location import Location from metadata.generated.schema.entity.data.location import Location
@ -382,8 +386,31 @@ class DatabaseServiceSource(DBTMixin, TopologyRunnerMixin, Source, ABC):
schema_name=self.context.database_schema.name.__root__, schema_name=self.context.database_schema.name.__root__,
table_name=table_name, table_name=table_name,
) )
datamodel = self.get_data_model(table_fqn) datamodel = self.get_data_model(table_fqn)
logger.info("Processing DBT Tags")
dbt_tag_labels = None
if datamodel: if datamodel:
dbt_tag_labels = datamodel.tags
if not dbt_tag_labels:
dbt_tag_labels = []
for column in datamodel.columns:
if column.tags:
dbt_tag_labels.extend(column.tags)
if dbt_tag_labels:
for tag_label in dbt_tag_labels:
yield OMetaTagAndCategory(
category_name=CreateTagCategoryRequest(
name="DBTTags",
description="",
categoryType="Classification",
),
category_details=CreateTagRequest(
name=tag_label.tagFQN.__root__.split(".")[1],
description="DBT Tags",
),
)
yield DataModelLink( yield DataModelLink(
fqn=table_fqn, fqn=table_fqn,
datamodel=datamodel, datamodel=datamodel,

View File

@ -16,6 +16,10 @@ from datetime import datetime
from typing import Dict, Iterable, List, Optional from typing import Dict, Iterable, List, Optional
from metadata.generated.schema.api.lineage.addLineage import AddLineageRequest from metadata.generated.schema.api.lineage.addLineage import AddLineageRequest
from metadata.generated.schema.api.tags.createTag import CreateTagRequest
from metadata.generated.schema.api.tags.createTagCategory import (
CreateTagCategoryRequest,
)
from metadata.generated.schema.api.tests.createTestCase import CreateTestCaseRequest from metadata.generated.schema.api.tests.createTestCase import CreateTestCaseRequest
from metadata.generated.schema.api.tests.createTestDefinition import ( from metadata.generated.schema.api.tests.createTestDefinition import (
CreateTestDefinitionRequest, CreateTestDefinitionRequest,
@ -27,6 +31,7 @@ from metadata.generated.schema.entity.data.table import (
ModelType, ModelType,
Table, Table,
) )
from metadata.generated.schema.entity.tags.tagCategory import Tag
from metadata.generated.schema.entity.teams.team import Team from metadata.generated.schema.entity.teams.team import Team
from metadata.generated.schema.entity.teams.user import User from metadata.generated.schema.entity.teams.user import User
from metadata.generated.schema.tests.basic import ( from metadata.generated.schema.tests.basic import (
@ -43,6 +48,13 @@ from metadata.generated.schema.tests.testDefinition import (
from metadata.generated.schema.tests.testSuite import TestSuite from metadata.generated.schema.tests.testSuite import TestSuite
from metadata.generated.schema.type.entityLineage import EntitiesEdge from metadata.generated.schema.type.entityLineage import EntitiesEdge
from metadata.generated.schema.type.entityReference import EntityReference from metadata.generated.schema.type.entityReference import EntityReference
from metadata.generated.schema.type.tagLabel import (
LabelType,
State,
TagLabel,
TagSource,
)
from metadata.ingestion.models.ometa_tag_category import OMetaTagAndCategory
from metadata.ingestion.ometa.ometa_api import OpenMetadata from metadata.ingestion.ometa.ometa_api import OpenMetadata
from metadata.ingestion.source.database.column_type_parser import ColumnTypeParser from metadata.ingestion.source.database.column_type_parser import ColumnTypeParser
from metadata.utils import fqn from metadata.utils import fqn
@ -121,6 +133,24 @@ class DBTMixin:
f"Unable to ingest owner from DBT since no user or team was found with name {dbt_owner}" f"Unable to ingest owner from DBT since no user or team was found with name {dbt_owner}"
) )
dbt_table_tags = mnode.get("tags")
dbt_table_tags_list = None
if dbt_table_tags:
dbt_table_tags_list = [
TagLabel(
tagFQN=fqn.build(
self.metadata,
entity_type=Tag,
tag_category_name="DBTTags",
tag_name=tag,
),
labelType=LabelType.Automated,
state=State.Confirmed,
source=TagSource.Tag,
)
for tag in dbt_table_tags
] or None
model = DataModel( model = DataModel(
modelType=ModelType.DBT, modelType=ModelType.DBT,
description=description if description else None, description=description if description else None,
@ -130,6 +160,7 @@ class DBTMixin:
columns=columns, columns=columns,
upstream=upstream_nodes, upstream=upstream_nodes,
owner=owner, owner=owner,
tags=dbt_table_tags_list,
) )
model_fqn = fqn.build( model_fqn = fqn.build(
self.metadata, self.metadata,
@ -187,12 +218,32 @@ class DBTMixin:
description = manifest_columns.get(key.lower(), {}).get("description") description = manifest_columns.get(key.lower(), {}).get("description")
if description is None: if description is None:
description = ccolumn.get("comment") description = ccolumn.get("comment")
dbt_column_tags = manifest_columns.get(key.lower(), {}).get("tags")
dbt_column_tags_list = None
if dbt_column_tags:
dbt_column_tags_list = [
TagLabel(
tagFQN=fqn.build(
self.metadata,
entity_type=Tag,
tag_category_name="DBTTags",
tag_name=tag,
),
labelType=LabelType.Automated,
state=State.Confirmed,
source=TagSource.Tag,
)
for tag in dbt_column_tags
] or None
col = Column( col = Column(
name=col_name, name=col_name,
description=description if description else None, description=description if description else None,
dataType=col_type, dataType=col_type,
dataLength=1, dataLength=1,
ordinalPosition=ccolumn["index"], ordinalPosition=ccolumn["index"],
tags=dbt_column_tags_list,
) )
columns.append(col) columns.append(col)
except Exception as exc: # pylint: disable=broad-except except Exception as exc: # pylint: disable=broad-except

View File

@ -516,7 +516,7 @@ class RedshiftSource(CommonDbSourceService):
table_name=view_name, table_name=view_name,
) )
if filter_by_table( if filter_by_table(
self.source_config.tableFilterPattern, table_name=view_fqn self.source_config.tableFilterPattern, table_fqn=view_fqn
): ):
self.status.filter( self.status.filter(
view_fqn, view_fqn,

View File

@ -472,9 +472,26 @@ public class TableRepository extends EntityRepository<Table> {
storeOwner(table, dataModel.getOwner()); storeOwner(table, dataModel.getOwner());
} }
table.setTags(dataModel.getTags());
applyTags(table);
// Carry forward the column description from the model to table columns, if empty
for (Column modelColumn : listOrEmpty(dataModel.getColumns())) {
Column stored =
table.getColumns().stream()
.filter(c -> EntityUtil.columnNameMatch.test(c, modelColumn))
.findAny()
.orElse(null);
if (stored == null) {
continue;
}
stored.setTags(modelColumn.getTags());
}
applyTags(table.getColumns());
dao.update(table.getId(), JsonUtils.pojoToJson(table)); dao.update(table.getId(), JsonUtils.pojoToJson(table));
setFieldsInternal(table, new Fields(List.of(FIELD_OWNER), FIELD_OWNER)); setFieldsInternal(table, new Fields(List.of(FIELD_OWNER), FIELD_OWNER));
setFieldsInternal(table, new Fields(List.of(FIELD_TAGS), FIELD_TAGS));
return table; return table;
} }

View File

@ -642,6 +642,14 @@
"$ref": "../../type/entityReference.json", "$ref": "../../type/entityReference.json",
"default": null "default": null
}, },
"tags": {
"description": "Tags for this data model.",
"type": "array",
"items": {
"$ref": "../../type/tagLabel.json"
},
"default": null
},
"columns": { "columns": {
"description": "Columns from the schema defined during modeling. In case of DBT, the metadata here comes from `schema.yaml`.", "description": "Columns from the schema defined during modeling. In case of DBT, the metadata here comes from `schema.yaml`.",
"type": "array", "type": "array",