Fixes #8506 Build tag and glossary term cache and use it instead of database lookups in list operations (#8686)

This commit is contained in:
Suresh Srinivas 2022-11-11 14:51:41 -08:00 committed by GitHub
parent 072e190dd3
commit 4d15696d71
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
5 changed files with 140 additions and 116 deletions

View File

@ -103,6 +103,7 @@ import org.openmetadata.service.jdbi3.CollectionDAO.UsageDAO.UsageDetailsMapper;
import org.openmetadata.service.jdbi3.FeedRepository.FilterType;
import org.openmetadata.service.jdbi3.locator.ConnectionAwareSqlQuery;
import org.openmetadata.service.jdbi3.locator.ConnectionAwareSqlUpdate;
import org.openmetadata.service.resources.tags.TagLabelCache;
import org.openmetadata.service.util.EntityUtil;
import org.openmetadata.service.util.FullyQualifiedName;
import org.openmetadata.service.util.JsonUtils;
@ -499,17 +500,6 @@ public interface CollectionDAO {
@Bind("relation") int relation,
@Bind("toEntity") String toEntity);
@SqlQuery(
"SELECT count(*) FROM entity_relationship "
+ "WHERE fromId = :fromId AND fromEntity = :fromEntity AND relation = :relation "
+ "AND (toEntity = :toEntity OR :toEntity IS NULL) "
+ "ORDER BY fromId")
int findToCount(
@Bind("fromId") String fromId,
@Bind("fromEntity") String fromEntity,
@Bind("relation") int relation,
@Bind("toEntity") String toEntity);
//
// Find from operations
//
@ -1269,17 +1259,6 @@ public interface CollectionDAO {
@Bind("type") ThreadType type,
@Bind("resolved") boolean resolved);
@SqlQuery(
"SELECT entityLink, COUNT(id) count FROM thread_entity WHERE (id IN (<threadIds>)) "
+ "AND resolved= :isResolved AND (:type IS NULL OR type = :type) "
+ "AND (:status IS NULL OR taskStatus = :status) GROUP BY entityLink")
@RegisterRowMapper(CountFieldMapper.class)
List<List<String>> listCountByThreads(
@BindList("threadIds") List<String> threadIds,
@Bind("type") ThreadType type,
@Bind("status") TaskStatus status,
@Bind("isResolved") boolean isResolved);
@SqlQuery(
"SELECT json FROM thread_entity WHERE updatedAt > :before AND resolved = :resolved AND "
+ "(:type IS NULL OR type = :type) AND entityId in ("
@ -1433,26 +1412,6 @@ public interface CollectionDAO {
@Bind("toType") String toType,
@Bind("relation") int relation);
@SqlQuery(
"SELECT fromFQN, toFQN, json FROM field_relationship WHERE "
+ "toFQN LIKE CONCAT(:fqnPrefix, '%') AND fromType = :fromType AND toType = :toType AND relation = :relation")
@RegisterRowMapper(FromFieldMapper.class)
List<Triple<String, String, String>> listFromByPrefix(
@Bind("fqnPrefix") String fqnPrefix,
@Bind("fromType") String fromType,
@Bind("toType") String toType,
@Bind("relation") int relation);
@SqlQuery(
"SELECT fromFQN, toFQN, json FROM field_relationship WHERE "
+ "toFQN LIKE CONCAT(:fqnPrefix, '%') AND fromType = :fromType AND toType LIKE CONCAT(:toType, '%') AND relation = :relation")
@RegisterRowMapper(FromFieldMapper.class)
List<Triple<String, String, String>> listFromByAllPrefix(
@Bind("fqnPrefix") String fqnPrefix,
@Bind("fromType") String fromType,
@Bind("toType") String toType,
@Bind("relation") int relation);
@SqlQuery(
"SELECT fromFQN, toFQN, json FROM field_relationship WHERE "
+ "fromFQN LIKE CONCAT(:fqnPrefix, '%') AND fromType = :fromType AND toType = :toType "
@ -1517,13 +1476,6 @@ public interface CollectionDAO {
return Triple.of(rs.getString("fromFQN"), rs.getString("toFQN"), rs.getString("json"));
}
}
class FromFieldMapper implements RowMapper<Triple<String, String, String>> {
@Override
public Triple<String, String, String> map(ResultSet rs, StatementContext ctx) throws SQLException {
return Triple.of(rs.getString("toFQN"), rs.getString("fromFQN"), rs.getString("json"));
}
}
}
interface BotDAO extends EntityDAO<Bot> {
@ -1923,27 +1875,14 @@ public interface CollectionDAO {
@SqlQuery("SELECT targetFQN FROM tag_usage WHERE tagFQN = :tagFQN")
List<String> tagTargetFQN(@Bind("tagFQN") String tagFQN);
@ConnectionAwareSqlQuery(
value =
"SELECT tu.source, tu.tagFQN, tu.labelType, tu.state, "
+ "t.json ->> '$.description' AS description1, "
+ "g.json ->> '$.description' AS description2 "
+ "FROM tag_usage tu "
+ "LEFT JOIN tag t ON tu.tagFQN = t.fullyQualifiedName AND tu.source = 0 "
+ "LEFT JOIN glossary_term_entity g ON tu.tagFQN = g.fullyQualifiedName AND tu.source = 1 "
+ "WHERE tu.targetFQN = :targetFQN ORDER BY tu.tagFQN",
connectionType = MYSQL)
@ConnectionAwareSqlQuery(
value =
"SELECT tu.source, tu.tagFQN, tu.labelType, tu.state, "
+ "t.json ->> 'description' AS description1, "
+ "g.json ->> 'description' AS description2 "
+ "FROM tag_usage tu "
+ "LEFT JOIN tag t ON tu.tagFQN = t.fullyQualifiedName AND tu.source = 0 "
+ "LEFT JOIN glossary_term_entity g ON tu.tagFQN = g.fullyQualifiedName AND tu.source = 1 "
+ "WHERE tu.targetFQN = :targetFQN ORDER BY tu.tagFQN",
connectionType = POSTGRES)
List<TagLabel> getTags(@Bind("targetFQN") String targetFQN);
default List<TagLabel> getTags(String targetFQN) {
List<TagLabel> tags = getTagsInternal(targetFQN);
tags.forEach(tagLabel -> tagLabel.setDescription(TagLabelCache.getInstance().getDescription(tagLabel)));
return tags;
}
@SqlQuery("SELECT source, tagFQN, labelType, state FROM tag_usage WHERE targetFQN = :targetFQN ORDER BY tagFQN")
List<TagLabel> getTagsInternal(@Bind("targetFQN") String targetFQN);
@SqlQuery("SELECT COUNT(*) FROM tag_usage WHERE tagFQN LIKE CONCAT(:fqnPrefix, '%') AND source = :source")
int getTagCount(@Bind("source") int source, @Bind("fqnPrefix") String fqnPrefix);
@ -1963,14 +1902,11 @@ public interface CollectionDAO {
class TagLabelMapper implements RowMapper<TagLabel> {
@Override
public TagLabel map(ResultSet r, StatementContext ctx) throws SQLException {
String description1 = r.getString("description1");
String description2 = r.getString("description2");
return new TagLabel()
.withSource(TagLabel.TagSource.values()[r.getInt("source")])
.withLabelType(TagLabel.LabelType.values()[r.getInt("labelType")])
.withState(TagLabel.State.values()[r.getInt("state")])
.withTagFQN(r.getString("tagFQN"))
.withDescription(description1 == null ? description2 : description1);
.withTagFQN(r.getString("tagFQN"));
}
}
}
@ -2168,22 +2104,10 @@ public interface CollectionDAO {
@Bind("limit") int limit,
@Bind("after") String after);
default List<String> listUsersUnderOrganization(String teamId) {
return listUsersUnderOrganization(teamId, Relationship.HAS.ordinal());
}
default List<String> listTeamsUnderOrganization(String teamId) {
return listTeamsUnderOrganization(teamId, Relationship.PARENT_OF.ordinal());
}
@SqlQuery(
"SELECT ue.id "
+ "FROM user_entity ue "
+ "WHERE ue.id NOT IN (SELECT :teamId) UNION "
+ "(SELECT toId FROM entity_relationship "
+ "WHERE fromId != :teamId AND fromEntity = `team` AND relation = :relation AND toEntity = `user`)")
List<String> listUsersUnderOrganization(@Bind("teamId") String teamId, @Bind("relation") int relation);
@SqlQuery(
"SELECT te.id "
+ "FROM team_entity te "
@ -2612,9 +2536,6 @@ public interface CollectionDAO {
@ConnectionAwareSqlQuery(value = "SELECT count(*) FROM user_entity WHERE email = :email", connectionType = MYSQL)
@ConnectionAwareSqlQuery(value = "SELECT count(*) FROM user_entity WHERE email = :email", connectionType = POSTGRES)
int checkEmailExists(@Bind("email") String email);
@SqlQuery(value = "SELECT json FROM user_entity WHERE email = :email")
String findUserByEmail(@Bind("email") String email);
}
interface ChangeEventDAO {
@ -3051,12 +2972,6 @@ public interface CollectionDAO {
+ "ORDER BY timestamp DESC LIMIT 1")
String getLatestExtension(@Bind("entityFQN") String entityFQN, @Bind("extension") String extension);
@SqlQuery(
"SELECT json FROM entity_extension_time_series WHERE entityFQN = :entityFQN AND extension = :extension "
+ "ORDER BY timestamp DESC LIMIT :limit")
List<String> getLastLatestExtension(
@Bind("entityFQN") String entityFQN, @Bind("extension") String extension, @Bind("limit") int limit);
@RegisterRowMapper(ExtensionMapper.class)
@SqlQuery(
"SELECT extension, json FROM entity_extension WHERE id = :id AND extension "
@ -3075,9 +2990,6 @@ public interface CollectionDAO {
void deleteAtTimestamp(
@Bind("entityFQN") String entityFQN, @Bind("extension") String extension, @Bind("timestamp") Long timestamp);
@SqlQuery("SELECT json FROM entity_extension_time_series WHERE entityFQN = :entityFQN and jsonSchema = :jsonSchema")
List<String> listByFQN(@Bind("entityFQN") String entityFQN, @Bind("jsonSchema") String jsonSchema);
@SqlQuery(
"SELECT json FROM entity_extension_time_series WHERE entityFQN = :entityFQN AND jsonSchema = :jsonSchema "
+ "ORDER BY timestamp DESC LIMIT 1")
@ -3198,7 +3110,7 @@ public interface CollectionDAO {
value = JsonUtils.readValue(json, TestResultNotificationConfiguration.class);
break;
default:
throw new RuntimeException("Invalid Settings Type");
throw new IllegalArgumentException("Invalid Settings Type " + configType);
}
} catch (IOException e) {
throw new RuntimeException(e);
@ -3242,22 +3154,18 @@ public interface CollectionDAO {
public static TokenInterface getToken(TokenType type, String json) throws IOException {
TokenInterface resp;
try {
switch (type) {
case EMAIL_VERIFICATION:
resp = JsonUtils.readValue(json, EmailVerificationToken.class);
break;
case PASSWORD_RESET:
resp = JsonUtils.readValue(json, PasswordResetToken.class);
break;
case REFRESH_TOKEN:
resp = JsonUtils.readValue(json, RefreshToken.class);
break;
default:
throw new RuntimeException("Invalid Token Type.");
}
} catch (IOException e) {
throw e;
switch (type) {
case EMAIL_VERIFICATION:
resp = JsonUtils.readValue(json, EmailVerificationToken.class);
break;
case PASSWORD_RESET:
resp = JsonUtils.readValue(json, PasswordResetToken.class);
break;
case REFRESH_TOKEN:
resp = JsonUtils.readValue(json, RefreshToken.class);
break;
default:
throw new RuntimeException("Invalid Token Type.");
}
return resp;
}

View File

@ -0,0 +1,115 @@
/*
* Copyright 2021 Collate
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
* http://www.apache.org/licenses/LICENSE-2.0
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.openmetadata.service.resources.tags;
import com.google.common.cache.CacheBuilder;
import com.google.common.cache.CacheLoader;
import com.google.common.cache.LoadingCache;
import com.google.common.util.concurrent.UncheckedExecutionException;
import java.io.IOException;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import javax.annotation.CheckForNull;
import lombok.extern.slf4j.Slf4j;
import org.openmetadata.schema.entity.data.GlossaryTerm;
import org.openmetadata.schema.entity.tags.Tag;
import org.openmetadata.schema.type.TagLabel;
import org.openmetadata.schema.type.TagLabel.TagSource;
import org.openmetadata.service.Entity;
import org.openmetadata.service.exception.EntityNotFoundException;
import org.openmetadata.service.jdbi3.EntityRepository;
import org.openmetadata.service.util.EntityUtil.Fields;
/**
* Both GlossaryTerm and Tags are used for labeling entity. This class caches GlossaryTerm and Tags for quick look up.
*/
@Slf4j
public class TagLabelCache {
private static final TagLabelCache INSTANCE = new TagLabelCache();
private static volatile boolean INITIALIZED = false;
protected static EntityRepository<Tag> TAG_REPOSITORY;
protected static EntityRepository<GlossaryTerm> GLOSSARY_TERM_REPOSITORY;
// Tag fqn to Tag information
protected static LoadingCache<String, Tag> TAG_CACHE;
// Glossary fqn to GlossaryTerm information
protected static LoadingCache<String, GlossaryTerm> GLOSSARY_CACHE;
// Expected to be called only once from the TagResource during initialization
public static void initialize() {
if (!INITIALIZED) {
TAG_CACHE =
CacheBuilder.newBuilder().maximumSize(1000).expireAfterAccess(1, TimeUnit.MINUTES).build(new TagLoader());
GLOSSARY_CACHE =
CacheBuilder.newBuilder()
.maximumSize(1000)
.expireAfterAccess(1, TimeUnit.MINUTES)
.build(new GlossaryTermLoader());
TAG_REPOSITORY = Entity.getEntityRepository(Entity.TAG);
GLOSSARY_TERM_REPOSITORY = Entity.getEntityRepository(Entity.GLOSSARY_TERM);
INITIALIZED = true;
} else {
LOG.info("Subject cache is already initialized");
}
}
public static TagLabelCache getInstance() {
return INSTANCE;
}
public Tag getTag(String tagFqn) {
try {
return TAG_CACHE.get(tagFqn);
} catch (ExecutionException | UncheckedExecutionException ex) {
throw new EntityNotFoundException(ex.getMessage());
}
}
public GlossaryTerm getGlossaryTerm(String glossaryTermFqn) {
try {
return GLOSSARY_CACHE.get(glossaryTermFqn);
} catch (ExecutionException | UncheckedExecutionException ex) {
throw new EntityNotFoundException(ex.getMessage());
}
}
public String getDescription(TagLabel label) {
if (label.getSource() == TagSource.TAG) {
return getTag(label.getTagFQN()).getDescription();
} else if (label.getSource() == TagSource.GLOSSARY) {
return getGlossaryTerm(label.getTagFQN()).getDescription();
} else {
throw new IllegalArgumentException("Invalid source type " + label.getSource());
}
}
static class TagLoader extends CacheLoader<String, Tag> {
@Override
public Tag load(@CheckForNull String tagName) throws IOException {
Tag tag = TAG_REPOSITORY.getByName(null, tagName, Fields.EMPTY_FIELDS);
LOG.info("Loaded user {}:{}", tag.getName(), tag.getId());
return tag;
}
}
static class GlossaryTermLoader extends CacheLoader<String, GlossaryTerm> {
@Override
public GlossaryTerm load(@CheckForNull String glossaryTermName) throws IOException {
GlossaryTerm glossaryTerm = GLOSSARY_TERM_REPOSITORY.getByName(null, glossaryTermName, Fields.EMPTY_FIELDS);
LOG.info("Loaded user {}:{}", glossaryTerm.getName(), glossaryTerm.getId());
return glossaryTerm;
}
}
}

View File

@ -111,6 +111,7 @@ public class TagResource {
});
daoCategory.initCategory(tagCategory);
}
TagLabelCache.initialize();
}
static final String FIELDS = "usageCount";

View File

@ -78,7 +78,6 @@ public class SubjectCache {
try {
return USER_CACHE.get(userName);
} catch (ExecutionException | UncheckedExecutionException ex) {
ex.printStackTrace();
throw new EntityNotFoundException(ex.getMessage());
}
}

View File

@ -2016,6 +2016,7 @@ public abstract class EntityResourceTest<T extends EntityInterface, K extends Cr
List<TagLabel> expectedTags = (List<TagLabel>) expected;
List<TagLabel> actualTags = JsonUtils.readObjects(actual.toString(), TagLabel.class);
assertTrue(actualTags.containsAll(expectedTags));
actualTags.forEach(tagLabel -> assertNotNull(tagLabel.getDescription()));
} else if (fieldName.startsWith("extension")) { // Custom properties related extension field changes
assertEquals(expected.toString(), actual.toString());
} else {