From 8afbbb8642a46a8f9b8dbca57dea6851226f3e9f Mon Sep 17 00:00:00 2001 From: Onkar Ravgan Date: Tue, 27 Sep 2022 11:12:53 +0530 Subject: [PATCH] Ingest DBT tags (#7726) * added tags * Added dbt tags ingestion Co-authored-by: Onkar Ravgan --- .../source/database/database_service.py | 27 ++++++++++ .../ingestion/source/database/dbt_source.py | 51 +++++++++++++++++++ .../ingestion/source/database/redshift.py | 2 +- .../service/jdbi3/TableRepository.java | 17 +++++++ .../json/schema/entity/data/table.json | 8 +++ 5 files changed, 104 insertions(+), 1 deletion(-) diff --git a/ingestion/src/metadata/ingestion/source/database/database_service.py b/ingestion/src/metadata/ingestion/source/database/database_service.py index 28363f22186..4dfbdf1d247 100644 --- a/ingestion/src/metadata/ingestion/source/database/database_service.py +++ b/ingestion/src/metadata/ingestion/source/database/database_service.py @@ -27,6 +27,10 @@ from metadata.generated.schema.api.lineage.addLineage import AddLineageRequest from metadata.generated.schema.api.services.createStorageService import ( CreateStorageServiceRequest, ) +from metadata.generated.schema.api.tags.createTag import CreateTagRequest +from metadata.generated.schema.api.tags.createTagCategory import ( + CreateTagCategoryRequest, +) from metadata.generated.schema.entity.data.database import Database from metadata.generated.schema.entity.data.databaseSchema import DatabaseSchema from metadata.generated.schema.entity.data.location import Location @@ -382,8 +386,31 @@ class DatabaseServiceSource(DBTMixin, TopologyRunnerMixin, Source, ABC): schema_name=self.context.database_schema.name.__root__, table_name=table_name, ) + datamodel = self.get_data_model(table_fqn) + + logger.info("Processing DBT Tags") + dbt_tag_labels = None if datamodel: + dbt_tag_labels = datamodel.tags + if not dbt_tag_labels: + dbt_tag_labels = [] + for column in datamodel.columns: + if column.tags: + dbt_tag_labels.extend(column.tags) + if dbt_tag_labels: + for tag_label in dbt_tag_labels: + yield OMetaTagAndCategory( + category_name=CreateTagCategoryRequest( + name="DBTTags", + description="", + categoryType="Classification", + ), + category_details=CreateTagRequest( + name=tag_label.tagFQN.__root__.split(".")[1], + description="DBT Tags", + ), + ) yield DataModelLink( fqn=table_fqn, datamodel=datamodel, diff --git a/ingestion/src/metadata/ingestion/source/database/dbt_source.py b/ingestion/src/metadata/ingestion/source/database/dbt_source.py index e52779ff109..67d816ef9bb 100644 --- a/ingestion/src/metadata/ingestion/source/database/dbt_source.py +++ b/ingestion/src/metadata/ingestion/source/database/dbt_source.py @@ -16,6 +16,10 @@ from datetime import datetime from typing import Dict, Iterable, List, Optional from metadata.generated.schema.api.lineage.addLineage import AddLineageRequest +from metadata.generated.schema.api.tags.createTag import CreateTagRequest +from metadata.generated.schema.api.tags.createTagCategory import ( + CreateTagCategoryRequest, +) from metadata.generated.schema.api.tests.createTestCase import CreateTestCaseRequest from metadata.generated.schema.api.tests.createTestDefinition import ( CreateTestDefinitionRequest, @@ -27,6 +31,7 @@ from metadata.generated.schema.entity.data.table import ( ModelType, Table, ) +from metadata.generated.schema.entity.tags.tagCategory import Tag from metadata.generated.schema.entity.teams.team import Team from metadata.generated.schema.entity.teams.user import User from metadata.generated.schema.tests.basic import ( @@ -43,6 +48,13 @@ from metadata.generated.schema.tests.testDefinition import ( from metadata.generated.schema.tests.testSuite import TestSuite from metadata.generated.schema.type.entityLineage import EntitiesEdge from metadata.generated.schema.type.entityReference import EntityReference +from metadata.generated.schema.type.tagLabel import ( + LabelType, + State, + TagLabel, + TagSource, +) +from metadata.ingestion.models.ometa_tag_category import OMetaTagAndCategory from metadata.ingestion.ometa.ometa_api import OpenMetadata from metadata.ingestion.source.database.column_type_parser import ColumnTypeParser from metadata.utils import fqn @@ -121,6 +133,24 @@ class DBTMixin: f"Unable to ingest owner from DBT since no user or team was found with name {dbt_owner}" ) + dbt_table_tags = mnode.get("tags") + dbt_table_tags_list = None + if dbt_table_tags: + dbt_table_tags_list = [ + TagLabel( + tagFQN=fqn.build( + self.metadata, + entity_type=Tag, + tag_category_name="DBTTags", + tag_name=tag, + ), + labelType=LabelType.Automated, + state=State.Confirmed, + source=TagSource.Tag, + ) + for tag in dbt_table_tags + ] or None + model = DataModel( modelType=ModelType.DBT, description=description if description else None, @@ -130,6 +160,7 @@ class DBTMixin: columns=columns, upstream=upstream_nodes, owner=owner, + tags=dbt_table_tags_list, ) model_fqn = fqn.build( self.metadata, @@ -187,12 +218,32 @@ class DBTMixin: description = manifest_columns.get(key.lower(), {}).get("description") if description is None: description = ccolumn.get("comment") + + dbt_column_tags = manifest_columns.get(key.lower(), {}).get("tags") + dbt_column_tags_list = None + if dbt_column_tags: + dbt_column_tags_list = [ + TagLabel( + tagFQN=fqn.build( + self.metadata, + entity_type=Tag, + tag_category_name="DBTTags", + tag_name=tag, + ), + labelType=LabelType.Automated, + state=State.Confirmed, + source=TagSource.Tag, + ) + for tag in dbt_column_tags + ] or None + col = Column( name=col_name, description=description if description else None, dataType=col_type, dataLength=1, ordinalPosition=ccolumn["index"], + tags=dbt_column_tags_list, ) columns.append(col) except Exception as exc: # pylint: disable=broad-except diff --git a/ingestion/src/metadata/ingestion/source/database/redshift.py b/ingestion/src/metadata/ingestion/source/database/redshift.py index ad728ad40dd..9a57fa7af9a 100644 --- a/ingestion/src/metadata/ingestion/source/database/redshift.py +++ b/ingestion/src/metadata/ingestion/source/database/redshift.py @@ -516,7 +516,7 @@ class RedshiftSource(CommonDbSourceService): table_name=view_name, ) if filter_by_table( - self.source_config.tableFilterPattern, table_name=view_fqn + self.source_config.tableFilterPattern, table_fqn=view_fqn ): self.status.filter( view_fqn, diff --git a/openmetadata-service/src/main/java/org/openmetadata/service/jdbi3/TableRepository.java b/openmetadata-service/src/main/java/org/openmetadata/service/jdbi3/TableRepository.java index 9734fd78e94..522153fad8a 100644 --- a/openmetadata-service/src/main/java/org/openmetadata/service/jdbi3/TableRepository.java +++ b/openmetadata-service/src/main/java/org/openmetadata/service/jdbi3/TableRepository.java @@ -472,9 +472,26 @@ public class TableRepository extends EntityRepository { storeOwner(table, dataModel.getOwner()); } + table.setTags(dataModel.getTags()); + applyTags(table); + + // Carry forward the column description from the model to table columns, if empty + for (Column modelColumn : listOrEmpty(dataModel.getColumns())) { + Column stored = + table.getColumns().stream() + .filter(c -> EntityUtil.columnNameMatch.test(c, modelColumn)) + .findAny() + .orElse(null); + if (stored == null) { + continue; + } + stored.setTags(modelColumn.getTags()); + } + applyTags(table.getColumns()); dao.update(table.getId(), JsonUtils.pojoToJson(table)); setFieldsInternal(table, new Fields(List.of(FIELD_OWNER), FIELD_OWNER)); + setFieldsInternal(table, new Fields(List.of(FIELD_TAGS), FIELD_TAGS)); return table; } diff --git a/openmetadata-spec/src/main/resources/json/schema/entity/data/table.json b/openmetadata-spec/src/main/resources/json/schema/entity/data/table.json index 220ed78bf9a..d870ac99397 100644 --- a/openmetadata-spec/src/main/resources/json/schema/entity/data/table.json +++ b/openmetadata-spec/src/main/resources/json/schema/entity/data/table.json @@ -642,6 +642,14 @@ "$ref": "../../type/entityReference.json", "default": null }, + "tags": { + "description": "Tags for this data model.", + "type": "array", + "items": { + "$ref": "../../type/tagLabel.json" + }, + "default": null + }, "columns": { "description": "Columns from the schema defined during modeling. In case of DBT, the metadata here comes from `schema.yaml`.", "type": "array",