Make database and DAO changes needed to persist TagLabel.reason

This commit is contained in:
Eugenio Doñaque 2025-09-24 14:28:17 +02:00 committed by Eugenio
parent 1332d0aa06
commit 098da51d07
9 changed files with 121 additions and 39 deletions

View File

@ -43,4 +43,7 @@ CREATE TABLE IF NOT EXISTS notification_template_entity (
UNIQUE KEY fqnHash (fqnHash), UNIQUE KEY fqnHash (fqnHash),
INDEX idx_notification_template_name (name), INDEX idx_notification_template_name (name),
INDEX idx_notification_template_provider (provider) INDEX idx_notification_template_provider (provider)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_0900_ai_ci; ) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_0900_ai_ci;
ALTER TABLE tag_usage
ADD COLUMN reason TEXT;

View File

@ -46,4 +46,7 @@ CREATE TABLE IF NOT EXISTS notification_template_entity (
); );
CREATE INDEX IF NOT EXISTS idx_notification_template_name ON notification_template_entity(name); CREATE INDEX IF NOT EXISTS idx_notification_template_name ON notification_template_entity(name);
CREATE INDEX IF NOT EXISTS idx_notification_template_provider ON notification_template_entity(provider); CREATE INDEX IF NOT EXISTS idx_notification_template_provider ON notification_template_entity(provider);
ALTER TABLE tag_usage
ADD COLUMN reason TEXT;

View File

@ -41,9 +41,10 @@ public class CachedTagUsageDAO implements CollectionDAO.TagUsageDAO {
String tagFQNHash, String tagFQNHash,
String targetFQNHash, String targetFQNHash,
int labelType, int labelType,
int state) { int state,
String reason) {
try { try {
delegate.applyTag(source, tagFQN, tagFQNHash, targetFQNHash, labelType, state); delegate.applyTag(source, tagFQN, tagFQNHash, targetFQNHash, labelType, state, reason);
if (RelationshipCache.isAvailable()) { if (RelationshipCache.isAvailable()) {
invalidateTagCaches(targetFQNHash); invalidateTagCaches(targetFQNHash);
RelationshipCache.bumpTag(tagFQN, 1); RelationshipCache.bumpTag(tagFQN, 1);
@ -437,10 +438,11 @@ public class CachedTagUsageDAO implements CollectionDAO.TagUsageDAO {
List<String> tagFQNHashes, List<String> tagFQNHashes,
List<String> targetFQNHashes, List<String> targetFQNHashes,
List<Integer> labelTypes, List<Integer> labelTypes,
List<Integer> states) { List<Integer> states,
List<String> reasons) {
// This is an internal method that delegates directly to the database // This is an internal method that delegates directly to the database
delegate.applyTagsBatchInternal( delegate.applyTagsBatchInternal(
sources, tagFQNs, tagFQNHashes, targetFQNHashes, labelTypes, states); sources, tagFQNs, tagFQNHashes, targetFQNHashes, labelTypes, states, reasons);
} }
@Override @Override

View File

@ -4357,11 +4357,11 @@ public interface CollectionDAO {
interface TagUsageDAO { interface TagUsageDAO {
@ConnectionAwareSqlUpdate( @ConnectionAwareSqlUpdate(
value = value =
"INSERT IGNORE INTO tag_usage (source, tagFQN, tagFQNHash, targetFQNHash, labelType, state) VALUES (:source, :tagFQN, :tagFQNHash, :targetFQNHash, :labelType, :state)", "INSERT IGNORE INTO tag_usage (source, tagFQN, tagFQNHash, targetFQNHash, labelType, state, reason) VALUES (:source, :tagFQN, :tagFQNHash, :targetFQNHash, :labelType, :state, :reason)",
connectionType = MYSQL) connectionType = MYSQL)
@ConnectionAwareSqlUpdate( @ConnectionAwareSqlUpdate(
value = value =
"INSERT INTO tag_usage (source, tagFQN, tagFQNHash, targetFQNHash, labelType, state) VALUES (:source, :tagFQN, :tagFQNHash, :targetFQNHash, :labelType, :state) ON CONFLICT (source, tagFQNHash, targetFQNHash) DO NOTHING", "INSERT INTO tag_usage (source, tagFQN, tagFQNHash, targetFQNHash, labelType, state, reason) VALUES (:source, :tagFQN, :tagFQNHash, :targetFQNHash, :labelType, :state, :reason) ON CONFLICT (source, tagFQNHash, targetFQNHash) DO NOTHING",
connectionType = POSTGRES) connectionType = POSTGRES)
void applyTag( void applyTag(
@Bind("source") int source, @Bind("source") int source,
@ -4369,7 +4369,8 @@ public interface CollectionDAO {
@BindFQN("tagFQNHash") String tagFQNHash, @BindFQN("tagFQNHash") String tagFQNHash,
@BindFQN("targetFQNHash") String targetFQNHash, @BindFQN("targetFQNHash") String targetFQNHash,
@Bind("labelType") int labelType, @Bind("labelType") int labelType,
@Bind("state") int state); @Bind("state") int state,
@Bind("reason") String reason);
default List<TagLabel> getTags(String targetFQN) { default List<TagLabel> getTags(String targetFQN) {
List<TagLabel> tags = getTagsInternal(targetFQN); List<TagLabel> tags = getTagsInternal(targetFQN);
@ -4401,11 +4402,11 @@ public interface CollectionDAO {
} }
@SqlQuery( @SqlQuery(
"SELECT source, tagFQN, labelType, state FROM tag_usage WHERE targetFQNHash = :targetFQNHash ORDER BY tagFQN") "SELECT source, tagFQN, labelType, state, reason FROM tag_usage WHERE targetFQNHash = :targetFQNHash ORDER BY tagFQN")
List<TagLabel> getTagsInternal(@BindFQN("targetFQNHash") String targetFQNHash); List<TagLabel> getTagsInternal(@BindFQN("targetFQNHash") String targetFQNHash);
@SqlQuery( @SqlQuery(
"SELECT targetFQNHash, source, tagFQN, labelType, state " "SELECT targetFQNHash, source, tagFQN, labelType, state, reason "
+ "FROM tag_usage " + "FROM tag_usage "
+ "WHERE targetFQNHash IN (<targetFQNHashes>) " + "WHERE targetFQNHash IN (<targetFQNHashes>) "
+ "ORDER BY targetFQNHash, tagFQN") + "ORDER BY targetFQNHash, tagFQN")
@ -4415,7 +4416,7 @@ public interface CollectionDAO {
@ConnectionAwareSqlQuery( @ConnectionAwareSqlQuery(
value = value =
"SELECT tu.source, tu.tagFQN, tu.labelType, tu.targetFQNHash, tu.state, " "SELECT tu.source, tu.tagFQN, tu.labelType, tu.targetFQNHash, tu.state, tu.reason, "
+ "CASE " + "CASE "
+ " WHEN tu.source = 1 THEN gterm.json " + " WHEN tu.source = 1 THEN gterm.json "
+ " WHEN tu.source = 0 THEN ta.json " + " WHEN tu.source = 0 THEN ta.json "
@ -4427,7 +4428,7 @@ public interface CollectionDAO {
connectionType = MYSQL) connectionType = MYSQL)
@ConnectionAwareSqlQuery( @ConnectionAwareSqlQuery(
value = value =
"SELECT tu.source, tu.tagFQN, tu.labelType, tu.targetFQNHash, tu.state, " "SELECT tu.source, tu.tagFQN, tu.labelType, tu.targetFQNHash, tu.state, tu.reason, "
+ "CASE " + "CASE "
+ " WHEN tu.source = 1 THEN gterm.json " + " WHEN tu.source = 1 THEN gterm.json "
+ " WHEN tu.source = 0 THEN ta.json " + " WHEN tu.source = 0 THEN ta.json "
@ -4653,7 +4654,8 @@ public interface CollectionDAO {
.withSource(TagLabel.TagSource.values()[r.getInt("source")]) .withSource(TagLabel.TagSource.values()[r.getInt("source")])
.withLabelType(TagLabel.LabelType.values()[r.getInt("labelType")]) .withLabelType(TagLabel.LabelType.values()[r.getInt("labelType")])
.withState(TagLabel.State.values()[r.getInt("state")]) .withState(TagLabel.State.values()[r.getInt("state")])
.withTagFQN(r.getString("tagFQN")); .withTagFQN(r.getString("tagFQN"))
.withReason(r.getString("reason"));
} }
} }
@ -4674,7 +4676,8 @@ public interface CollectionDAO {
.withSource(TagLabel.TagSource.values()[r.getInt("source")]) .withSource(TagLabel.TagSource.values()[r.getInt("source")])
.withLabelType(TagLabel.LabelType.values()[r.getInt("labelType")]) .withLabelType(TagLabel.LabelType.values()[r.getInt("labelType")])
.withState(TagLabel.State.values()[r.getInt("state")]) .withState(TagLabel.State.values()[r.getInt("state")])
.withTagFQN(r.getString("tagFQN")); .withTagFQN(r.getString("tagFQN"))
.withReason(r.getString("reason"));
TagLabel.TagSource source = TagLabel.TagSource.values()[r.getInt("source")]; TagLabel.TagSource source = TagLabel.TagSource.values()[r.getInt("source")];
if (source == TagLabel.TagSource.CLASSIFICATION) { if (source == TagLabel.TagSource.CLASSIFICATION) {
Tag tag = JsonUtils.readValue(r.getString("json"), Tag.class); Tag tag = JsonUtils.readValue(r.getString("json"), Tag.class);
@ -4704,6 +4707,7 @@ public interface CollectionDAO {
tag.setTagFQN(rs.getString("tagFQN")); tag.setTagFQN(rs.getString("tagFQN"));
tag.setLabelType(rs.getInt("labelType")); tag.setLabelType(rs.getInt("labelType"));
tag.setState(rs.getInt("state")); tag.setState(rs.getInt("state"));
tag.setReason(rs.getString("reason"));
return tag; return tag;
} }
} }
@ -4716,6 +4720,7 @@ public interface CollectionDAO {
private String tagFQN; private String tagFQN;
private int labelType; private int labelType;
private int state; private int state;
private String reason;
// Getters and Setters // Getters and Setters
@ -4725,6 +4730,7 @@ public interface CollectionDAO {
tagLabel.setTagFQN(this.tagFQN); tagLabel.setTagFQN(this.tagFQN);
tagLabel.setLabelType(TagLabel.LabelType.values()[this.labelType]); tagLabel.setLabelType(TagLabel.LabelType.values()[this.labelType]);
tagLabel.setState(TagLabel.State.values()[this.state]); tagLabel.setState(TagLabel.State.values()[this.state]);
tagLabel.setReason(this.reason);
return tagLabel; return tagLabel;
} }
} }
@ -4788,6 +4794,7 @@ public interface CollectionDAO {
List<String> targetFQNHashes = new ArrayList<>(); List<String> targetFQNHashes = new ArrayList<>();
List<Integer> labelTypes = new ArrayList<>(); List<Integer> labelTypes = new ArrayList<>();
List<Integer> states = new ArrayList<>(); List<Integer> states = new ArrayList<>();
List<String> reasons = new ArrayList<>();
for (TagLabel tagLabel : tagLabels) { for (TagLabel tagLabel : tagLabels) {
sources.add(tagLabel.getSource().ordinal()); sources.add(tagLabel.getSource().ordinal());
@ -4796,19 +4803,21 @@ public interface CollectionDAO {
targetFQNHashes.add(targetFQNHash); targetFQNHashes.add(targetFQNHash);
labelTypes.add(tagLabel.getLabelType().ordinal()); labelTypes.add(tagLabel.getLabelType().ordinal());
states.add(tagLabel.getState().ordinal()); states.add(tagLabel.getState().ordinal());
reasons.add(tagLabel.getReason());
} }
applyTagsBatchInternal(sources, tagFQNs, tagFQNHashes, targetFQNHashes, labelTypes, states); applyTagsBatchInternal(
sources, tagFQNs, tagFQNHashes, targetFQNHashes, labelTypes, states, reasons);
} }
@Transaction @Transaction
@ConnectionAwareSqlBatch( @ConnectionAwareSqlBatch(
value = value =
"INSERT IGNORE INTO tag_usage (source, tagFQN, tagFQNHash, targetFQNHash, labelType, state) VALUES (:source, :tagFQN, :tagFQNHash, :targetFQNHash, :labelType, :state)", "INSERT IGNORE INTO tag_usage (source, tagFQN, tagFQNHash, targetFQNHash, labelType, state, reason) VALUES (:source, :tagFQN, :tagFQNHash, :targetFQNHash, :labelType, :state, :reason)",
connectionType = MYSQL) connectionType = MYSQL)
@ConnectionAwareSqlBatch( @ConnectionAwareSqlBatch(
value = value =
"INSERT INTO tag_usage (source, tagFQN, tagFQNHash, targetFQNHash, labelType, state) VALUES (:source, :tagFQN, :tagFQNHash, :targetFQNHash, :labelType, :state) ON CONFLICT (source, tagFQNHash, targetFQNHash) DO NOTHING", "INSERT INTO tag_usage (source, tagFQN, tagFQNHash, targetFQNHash, labelType, state, reason) VALUES (:source, :tagFQN, :tagFQNHash, :targetFQNHash, :labelType, :state, :reason) ON CONFLICT (source, tagFQNHash, targetFQNHash) DO NOTHING",
connectionType = POSTGRES) connectionType = POSTGRES)
void applyTagsBatchInternal( void applyTagsBatchInternal(
@Bind("source") List<Integer> sources, @Bind("source") List<Integer> sources,
@ -4816,7 +4825,8 @@ public interface CollectionDAO {
@Bind("tagFQNHash") List<String> tagFQNHashes, @Bind("tagFQNHash") List<String> tagFQNHashes,
@Bind("targetFQNHash") List<String> targetFQNHashes, @Bind("targetFQNHash") List<String> targetFQNHashes,
@Bind("labelType") List<Integer> labelTypes, @Bind("labelType") List<Integer> labelTypes,
@Bind("state") List<Integer> states); @Bind("state") List<Integer> states,
@Bind("reason") List<String> reasons);
/** /**
* Delete multiple tags in batch to improve performance * Delete multiple tags in batch to improve performance

View File

@ -2465,7 +2465,8 @@ public abstract class EntityRepository<T extends EntityInterface> {
tagLabel.getTagFQN(), tagLabel.getTagFQN(),
targetFQN, targetFQN,
tagLabel.getLabelType().ordinal(), tagLabel.getLabelType().ordinal(),
tagLabel.getState().ordinal()); tagLabel.getState().ordinal(),
tagLabel.getReason());
// Update RDF store // Update RDF store
org.openmetadata.service.rdf.RdfTagUpdater.applyTag(tagLabel, targetFQN); org.openmetadata.service.rdf.RdfTagUpdater.applyTag(tagLabel, targetFQN);

View File

@ -202,7 +202,8 @@ class CacheWarmupIntegrationTest extends CachedOpenMetadataApplicationResourceTe
tagHash, tagHash,
table.getFullyQualifiedName(), table.getFullyQualifiedName(),
LabelType.MANUAL.ordinal(), LabelType.MANUAL.ordinal(),
State.CONFIRMED.ordinal()); State.CONFIRMED.ordinal(),
"Applied for testing purposes");
LOG.debug("Applied tag {} to table {}", tagFQN, table.getName()); LOG.debug("Applied tag {} to table {}", tagFQN, table.getName());
} }
@ -423,7 +424,8 @@ class CacheWarmupIntegrationTest extends CachedOpenMetadataApplicationResourceTe
tagHash, tagHash,
table.getFullyQualifiedName(), table.getFullyQualifiedName(),
LabelType.MANUAL.ordinal(), LabelType.MANUAL.ordinal(),
State.CONFIRMED.ordinal()); State.CONFIRMED.ordinal(),
"Applied for testing purposes");
long currentUsage = RelationshipCache.getTagUsage(testTagFQN); long currentUsage = RelationshipCache.getTagUsage(testTagFQN);
} }

View File

@ -216,7 +216,8 @@ class CacheWarmupServiceTest extends CachedOpenMetadataApplicationResourceTest {
"test-tag-hash-" + i, "test-tag-hash-" + i,
entityFQN, entityFQN,
LabelType.MANUAL.ordinal(), LabelType.MANUAL.ordinal(),
State.CONFIRMED.ordinal()); State.CONFIRMED.ordinal(),
"Applied for testing purposes");
} }
} }

View File

@ -249,7 +249,8 @@ public class TagUsageCacheTest extends CachedOpenMetadataApplicationResourceTest
testTagFQNHash, testTagFQNHash,
testEntityFQNHash, testEntityFQNHash,
LabelType.MANUAL.ordinal(), LabelType.MANUAL.ordinal(),
State.CONFIRMED.ordinal()); State.CONFIRMED.ordinal(),
"Applied for testing purposes");
// Verify tag usage counter was updated // Verify tag usage counter was updated
long tagUsage = RelationshipCache.getTagUsage(testTagFQN); long tagUsage = RelationshipCache.getTagUsage(testTagFQN);
@ -274,7 +275,8 @@ public class TagUsageCacheTest extends CachedOpenMetadataApplicationResourceTest
testTagFQNHash, testTagFQNHash,
testEntityFQNHash, testEntityFQNHash,
LabelType.MANUAL.ordinal(), LabelType.MANUAL.ordinal(),
State.CONFIRMED.ordinal()); State.CONFIRMED.ordinal(),
"Applied for testing purposes");
// First call should be a cache miss // First call should be a cache miss
List<TagLabel> firstResult = tagUsageDAO.getTags(testEntityFQNHash); List<TagLabel> firstResult = tagUsageDAO.getTags(testEntityFQNHash);
@ -312,7 +314,8 @@ public class TagUsageCacheTest extends CachedOpenMetadataApplicationResourceTest
testTagFQNHash, testTagFQNHash,
testEntityFQNHash, testEntityFQNHash,
LabelType.MANUAL.ordinal(), LabelType.MANUAL.ordinal(),
State.CONFIRMED.ordinal()); State.CONFIRMED.ordinal(),
"Applied for testing purposes");
// First batch call should be a cache miss // First batch call should be a cache miss
List<CollectionDAO.TagUsageDAO.TagLabelWithFQNHash> firstBatchResult = List<CollectionDAO.TagUsageDAO.TagLabelWithFQNHash> firstBatchResult =
@ -378,7 +381,8 @@ public class TagUsageCacheTest extends CachedOpenMetadataApplicationResourceTest
testTagFQNHash, testTagFQNHash,
testEntityFQNHash, testEntityFQNHash,
LabelType.MANUAL.ordinal(), LabelType.MANUAL.ordinal(),
State.CONFIRMED.ordinal()); State.CONFIRMED.ordinal(),
"Applied for testing purposes");
// Verify tag usage counter // Verify tag usage counter
long initialUsage = RelationshipCache.getTagUsage(testTagFQN); long initialUsage = RelationshipCache.getTagUsage(testTagFQN);
@ -458,7 +462,8 @@ public class TagUsageCacheTest extends CachedOpenMetadataApplicationResourceTest
tagHash, tagHash,
testEntityFQNHash, testEntityFQNHash,
LabelType.MANUAL.ordinal(), LabelType.MANUAL.ordinal(),
State.CONFIRMED.ordinal()); State.CONFIRMED.ordinal(),
"Applied for testing purposes");
// Get tags and verify they exist // Get tags and verify they exist
List<TagLabel> tags = tagUsageDAO.getTags(testEntityFQNHash); List<TagLabel> tags = tagUsageDAO.getTags(testEntityFQNHash);

View File

@ -77,17 +77,7 @@ import java.io.IOException;
import java.net.URLEncoder; import java.net.URLEncoder;
import java.nio.charset.StandardCharsets; import java.nio.charset.StandardCharsets;
import java.text.ParseException; import java.text.ParseException;
import java.util.ArrayList; import java.util.*;
import java.util.Arrays;
import java.util.Comparator;
import java.util.Date;
import java.util.HashMap;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Locale;
import java.util.Map;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.CountDownLatch; import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference; import java.util.concurrent.atomic.AtomicReference;
@ -5326,4 +5316,69 @@ public class TableResourceTest extends EntityResourceTest<Table, CreateTable> {
dataProductTest.deleteEntity(dataProduct.getId(), false, true, ADMIN_AUTH_HEADERS); dataProductTest.deleteEntity(dataProduct.getId(), false, true, ADMIN_AUTH_HEADERS);
domainTest.deleteEntity(domain.getId(), false, true, ADMIN_AUTH_HEADERS); domainTest.deleteEntity(domain.getId(), false, true, ADMIN_AUTH_HEADERS);
} }
@Test
void test_columnWithMultipleTags_withClassificationReason(TestInfo test) throws IOException {
// Patch a table:
// 1. PII.Sensitive - from classification with reason "Classified with score 1.0"
// 2. Personal.Name - manual tag with no reason (null)
Column column = getColumn(C1, ColumnDataType.STRING, null);
CreateTable request = createRequest(test).withColumns(List.of(column));
Table table = createEntity(request, ADMIN_AUTH_HEADERS);
TagLabel personalTagLabel = PERSONAL_DATA_TAG_LABEL;
TagLabel sensitiveTagLabel =
PII_SENSITIVE_TAG_LABEL.withReason("Classified with score 1.0"); // Classification reason
// Create a column with sensitive tag
Column columnWithAutoClassification = column.withTags(List.of(sensitiveTagLabel));
String originalTable = JsonUtils.pojoToJson(table);
table = table.withColumns(List.of(columnWithAutoClassification));
Table patchedTable = patchEntity(table.getId(), originalTable, table, ADMIN_AUTH_HEADERS);
assertNotNull(patchedTable.getColumns());
assertEquals(1, patchedTable.getColumns().size());
Column patchedColumn = patchedTable.getColumns().getFirst();
List<TagLabel> tags = patchedColumn.getTags();
assertNotNull(tags);
assertEquals(1, tags.size());
TagLabel piiTag = tags.getFirst();
assertNotNull(piiTag);
assertEquals("Sensitive", piiTag.getName());
assertEquals("PII.Sensitive", piiTag.getTagFQN());
assertEquals("Classified with score 1.0", piiTag.getReason());
// Now add personal tag manually
Column columnWithBothTags = column.withTags(List.of(sensitiveTagLabel, personalTagLabel));
originalTable = JsonUtils.pojoToJson(patchedTable);
table = patchedTable.withColumns(List.of(columnWithBothTags));
patchedTable = patchEntity(table.getId(), originalTable, table, ADMIN_AUTH_HEADERS);
assertNotNull(patchedTable.getColumns());
assertEquals(1, patchedTable.getColumns().size());
patchedColumn = patchedTable.getColumns().getFirst();
tags = patchedColumn.getTags();
assertNotNull(tags);
assertEquals(2, tags.size());
piiTag = tags.getFirst();
assertNotNull(piiTag);
assertEquals("Sensitive", piiTag.getName());
assertEquals("PII.Sensitive", piiTag.getTagFQN());
assertEquals("Classified with score 1.0", piiTag.getReason());
TagLabel personalTag = tags.getLast();
assertNotNull(personalTag);
assertEquals("Personal", personalTag.getName());
assertEquals("PersonalData.Personal", personalTag.getTagFQN());
assertNull(personalTag.getReason());
}
} }